此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.2.1! |
此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.2.1! |
自 3.0 以来 3.1 中的新增功能
本节介绍从 3.0 版到 3.1 版所做的更改。 有关早期版本中的更改,请参阅更改历史记录。
EmbeddedKafkaBroker
现在提供了一个额外的实现来代替 Zookeeper。
有关更多信息,请参见 Embedded Kafka Broker。Kraft
Json解串器
发生反序列化异常时,消息不再包含格式为 ;每个数据字节的数值数组没有用,对于大数据可能很详细。
当与 一起使用时,发送到错误处理程序包含包含无法反序列化的原始数据的属性。
当不与 一起使用时,将不断发出同一记录的异常,显示主题/分区/偏移量以及 Jackson 抛出的原因。SerializationException
Can’t deserialize data [[123, 34, 98, 97, 122, …
ErrorHandlingDeserializer
DeserializationException
data
ErrorHandlingDeserializer
KafkaConsumer
容器后处理器
通过在注释上指定 a 的 Bean 名称,可以将后处理应用于侦听器容器。
这将在创建容器并在容器工厂上配置任何配置之后发生。
有关详细信息,请参阅容器工厂。ContainerPostProcessor
@KafkaListener
ContainerCustomizer
ErrorHandling解串器
您现在可以向此反序列化程序添加 a;如果委托成功反序列化对象,但该对象未通过验证,则会引发类似于发生的反序列化异常的异常。
这允许将原始原始数据传递到错误处理程序。
有关更多信息,请参见使用 ErrorHandlingDeserializer
。Validator
Deserializer
可重试主题
将后缀更改为 when 。
如果要保留后缀,请使用 .
有关详细信息,请参阅主题命名。-retry-5000
-retry
@RetryableTopic(backoff = @Backoff(delay = 5000), attempts = "2", fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)
-retry-5000
@RetryableTopic(backoff = @Backoff(delay = 5000), attempts = "2")
侦听器容器更改
当使用者手动分配分区时,现在会自动强制使用 。
有关详细信息,请参阅手动分配所有分区。null
group.id
AckMode
MANUAL
自 2.9 以来 3.0 中的新增功能
观察
现在支持使用 Micrometer 启用计时器观察和跟踪。 有关详细信息,请参阅观察。
原生图像
提供了对创建本机映像的支持。 有关详细信息,请参阅本机映像。
全局单一嵌入式 Kafka
嵌入的 Kafka () 现在可以作为整个测试计划的单个全局实例启动。
有关更多信息,请参见对多个测试类使用相同的代理。EmbeddedKafkaBroker
可重试主题更改
此功能不再被视为实验性功能(就其 API 而言),该功能本身自 2.7 以来一直受支持,但破坏 API 更改的可能性比正常情况更大。
在此发行版中,非阻塞重试基础结构 Bean 的引导已更改,以避免某些应用程序中发生的有关应用程序初始化的一些计时问题。
现在可以为重试容器设置不同的容器;默认情况下,并发与主容器相同。concurrency
@RetryableTopic
现在可以用作自定义注释的元注释,包括对属性的支持。@AliasFor
有关详细信息,请参阅配置。
重试主题的缺省复制因子现在是(使用 broker default)。
如果您的代理版本低于 2.4 版,那么您现在需要显式设置该属性。-1
现在,您可以在同一应用程序上下文中针对同一主题配置多个侦听器。
以前,这是不可能的。
有关详细信息,请参阅多个侦听器,同一主题。@RetryableTopic
中有重大 API 更改;具体来说,如果覆盖 和 /或 的 Bean 定义方法;
这些方法现在需要一个参数。RetryTopicConfigurationSupport
destinationTopicResolver
kafkaConsumerBackoffManager
retryTopicConfigurer
ObjectProvider<RetryTopicComponentFactory>
侦听器容器更改
与使用者身份验证和授权失败相关的事件现在由容器发布。 有关详细信息,请参阅应用程序事件。
现在,您可以自定义使用者线程使用的线程名称。 有关详细信息,请参阅容器线程命名。
添加了容器属性。
有关详细信息,请参阅侦听器容器属性。restartAfterAuthException
KafkaTemplate
变化
该类返回的期货现在是 s 而不是 s。
请参阅使用 KafkaTemplate
。CompletableFuture
ListenableFuture
ReplyingKafkaTemplate
变化
该类返回的期货现在是 s 而不是 s。
请参阅使用 ReplyingKafkaTemplate
和 Request/Reply with Message<?>
s。CompletableFuture
ListenableFuture
@KafkaListener
变化
现在,您可以使用自定义关联标头,该标头将在任何回复消息中回显。
有关详细信息,请参阅使用 ReplyingKafkaTemplate
末尾的注释。
现在,您可以在处理整个批处理之前手动提交批处理的各个部分。 有关详细信息,请参阅提交偏移量。
KafkaHeaders
变化
在 2.9.x 中弃用的四个常量现在已被删除。KafkaHeaders
-
而不是 ,使用 。
MESSAGE_KEY
KEY
-
代替 ,使用
PARTITION_ID
PARTITION
类似地,被替换为 和 被替换为 。RECEIVED_MESSAGE_KEY
RECEIVED_KEY
RECEIVED_PARTITION_ID
RECEIVED_PARTITION
测试更改
版本 3.0.7 引入了 和 .
有关详细信息,请参阅模拟使用者和生产者。MockConsumerFactory
MockProducerFactory
从版本 3.0.10 开始,默认情况下,嵌入式 Kafka 代理将 Spring Boot 属性设置为嵌入式代理的地址。spring.kafka.bootstrap-servers
使用交易时,最低代理版本为 2.5。 |
自 2.8 以来 2.9 中的新增功能
错误处理程序更改
现在可以配置为暂停一个轮询的容器,并使用上一次轮询的剩余结果,而不是寻求剩余记录的偏移量。
有关更多信息,请参见 DefaultErrorHandler。DefaultErrorHandler
现在有一个属性。
有关详细信息,请参阅回退处理程序。DefaultErrorHandler
BackOffHandler
侦听器容器更改
interceptBeforeTx
现在适用于所有事务管理器(以前仅在使用 A 时应用)。
请参阅 [interceptBeforeTx]。KafkaAwareTransactionManager
提供了一个新的容器属性,该属性允许容器在处理当前记录后暂停使用者,而不是在处理上一个轮询中的所有记录后暂停使用者。
请参阅 [pauseImmediate]。pauseImmediate
与使用者身份验证和授权相关的事件
标头映射器更改
现在,您可以配置应映射哪些入站标头。 也可在版本 2.8.8 或更高版本中使用。 有关详细信息,请参阅邮件头。
KafkaTemplate
变化
在 3.0 中,该类返回的期货将是 s 而不是 s。
请参阅使用 KafkaTemplate
,以获取使用此版本时过渡的帮助。CompletableFuture
ListenableFuture
ReplyingKafkaTemplate
变化
该模板现在提供了一种等待在回复容器上分配的方法,以避免在初始化回复容器之前发送请求时出现争用。
也可在版本 2.8.8 或更高版本中使用。
请参阅使用 ReplyingKafkaTemplate
。
在 3.0 中,该类返回的期货将是 s 而不是 s。
请参阅使用 ReplyingKafkaTemplate
和 Request/Reply with Message<?>
s,以获取使用此版本时过渡的帮助。CompletableFuture
ListenableFuture
自 2.7 以来 2.8 中的新增功能
本节介绍从版本 2.7 到版本 2.8 所做的更改。 有关早期版本中的更改,请参阅更改历史记录。
套餐变更
与类型映射相关的类和接口已从 移至 。…support.converter
…support.mapping
-
AbstractJavaTypeMapper
-
ClassMapper
-
DefaultJackson2JavaTypeMapper
-
Jackson2JavaTypeMapper
乱序手动提交
侦听器容器现在可以配置为接受无序手动偏移提交(通常是异步)。 容器将推迟提交,直到确认缺少的偏移量。 有关详细信息,请参阅手动提交偏移量。
@KafkaListener
变化
现在可以在方法本身上指定侦听器方法是否为批处理侦听器。 这允许将同一容器工厂用于记录侦听器和批处理侦听器。
有关详细信息,请参阅 [batch-listeners]。
批处理侦听器现在可以处理转换异常。
有关详细信息,请参阅使用批处理错误处理程序的转换错误。
RecordFilterStrategy
,当与批处理侦听器一起使用时,现在可以在一次调用中筛选整个批处理。
有关详细信息,请参阅 [batch-listeners] 末尾的注释。
注解现在具有属性,以覆盖仅此侦听器的容器工厂。@KafkaListener
filter
RecordFilterStrategy
KafkaTemplate
变化
现在,您可以接收一条记录,给定主题、分区和偏移量。
有关详细信息,请参阅使用 KafkaTemplate
接收。
CommonErrorHandler
添加
用于记录批处理侦听器的旧版及其子接口层次结构已被新的单个接口所取代,其实现与大多数旧版实现相对应。
有关详细信息,请参阅容器错误处理程序和将自定义旧错误处理程序实现迁移到 CommonErrorHandler
。GenericErrorHandler
CommonErrorHandler
GenericErrorHandler
侦听器容器更改
容器属性现在是默认的。interceptBeforeTx
true
除了之前的 s 之外,该属性已重命名为 s,现在还应用于 s。
这两个异常都被视为致命异常,除非设置了此属性,否则容器将默认停止。authorizationExceptionRetryInterval
authExceptionRetryInterval
AuthenticationException
AuthorizationException
有关更多信息,请参见使用 KafkaMessageListenerContainer
和 Listener Container 属性。
串行程序/解串程序更改
现在提供 和 。
有关详细信息,请参阅委派序列化程序和反序列化程序。DelegatingByTopicSerializer
DelegatingByTopicDeserializer
DeadLetterPublishingRecover
变化
该属性现在是默认的。stripPreviousExceptionHeaders
true
现在有几种技术可以自定义将哪些标头添加到输出记录中。
有关详细信息,请参阅管理死信记录标头。
可重试主题更改
现在,您可以对可重试和不可重试的主题使用相同的工厂。 有关详细信息,请参阅指定 ListenerContainerFactory。
现在有一个可管理的全局致命异常列表,这些异常将使失败的记录直接进入 DLT。 请参阅异常分类器,了解如何管理它。
现在,您可以结合使用阻塞和非阻塞重试。 有关详细信息,请参阅合并阻塞和非阻塞重试。
使用可重试主题功能时引发的 KafkaBackOffException 现在记录在 DEBUG 级别。 如果需要将日志记录级别更改回 WARN 或将其设置为任何其他级别,请参阅更改 KafkaBackOffException 日志记录级别。
2.6 和 2.7 之间的变化
Kafka 客户端版本
此版本需要 2.7.0 .
从2.7.1版本开始,它还与2.8.0客户端兼容;请参阅覆盖 Spring Boot 依赖项。kafka-clients
使用主题进行非阻塞延迟重试
此版本中添加了此重要新功能。 当严格排序不重要时,可以将失败的投放发送到另一个主题以供以后使用。 可以配置一系列这样的重试主题,但延迟会增加。 有关详细信息,请参阅非阻塞重试。
侦听器容器更改
容器属性现在是默认的。onlyLogRecordMetadata
true
新的容器属性现在可用。stopImmediate
有关详细信息,请参阅侦听器容器属性。
在两次传递尝试之间使用 的错误处理程序(例如 和 ) 现在将在容器停止后不久退出回退间隔,而不是延迟停止。BackOff
SeekToCurrentErrorHandler
DefaultAfterRollbackProcessor
现在可以为扩展的错误处理程序和回滚后处理器配置一个或多个 s,以接收有关重试和恢复进度的信息。FailedRecordProcessor
RetryListener
现在有其他方法在侦听器返回后调用(通常,或通过抛出异常)。
它还有一个子界面。
此外,现在还有一个用于批处理侦听器。
有关更多信息,请参阅消息侦听器容器。RecordInterceptor
ConsumerAwareRecordInterceptor
BatchInterceptor
@KafkaListener
变化
现在,您可以验证方法(类级侦听器)的有效负载参数。
有关详细信息@KafkaListener请参阅@Payload
验证。@KafkaHandler
您现在可以在 和 上设置属性,这会导致原始添加到转换后的 .
例如,如果您希望在侦听器错误处理程序中使用 ,这很有用。
有关详细信息,请参阅侦听器错误处理程序。rawRecordHeader
MessagingMessageConverter
BatchMessagingMessageConverter
ConsumerRecord
Message<?>
DeadLetterPublishingRecoverer
现在,您可以在应用程序初始化期间修改批注。
有关详细信息@KafkaListener
请参阅属性修改。@KafkaListener
DeadLetterPublishingRecover
变化
现在,如果键和值都未通过反序列化,则原始值将发布到 DLT。
以前,该值已填充,但键仍保留在标头中。
如果对恢复器进行子类化并覆盖该方法,则会发生中断性 API 更改。DeserializationException
createProducerRecord
此外,恢复程序在发布到目标解析程序之前会验证目标解析程序选择的分区是否确实存在。
有关详细信息,请参阅发布死信记录。
ChainedKafkaTransactionManager
已弃用
有关详细信息,请参阅交易。
ReplyingKafkaTemplate
变化
现在有一种机制可以检查回复,如果存在某些条件,则将来会异常失败。
添加了对发送和接收的支持。spring-messaging
Message<?>
有关详细信息,请参阅使用 ReplyingKafkaTemplate
。
Kafka Streams 更改
默认情况下,现在配置为不清理本地状态。
有关详细信息,请参阅配置。StreamsBuilderFactoryBean
KafkaAdmin
变化
添加了新方法。 已添加,以便于在单个 Bean 中配置多个主题。
有关详细信息,请参阅 [configuring-topics]。createOrModifyTopics
describeTopics
KafkaAdmin.NewTopics
MessageConverter
变化
现在可以将 a 添加到 ,允许基于标头进行内容协商。
有关更多信息,请参阅 Spring Messaging Message Conversion。spring-messaging
SmartMessageConverter
MessagingMessageConverter
contentType
ExponentialBackOffWithMaxRetries
提供了新的实现,使配置最大重试次数更加方便。
有关更多信息,请参见 ExponentialBackOffWithMaxRetries
实现。BackOff
条件委派错误处理程序
这些新的错误处理程序可以配置为委派给不同的错误处理程序,具体取决于异常类型。 有关详细信息,请参阅委派错误处理程序。
2.5 和 2.6 之间的变化
侦听器容器更改
默认值现在为 。
有关详细信息,请参阅 Exactly Once 语义。EOSMode
BETA
各种错误处理程序(扩展)和现在重置如果恢复失败。
此外,您现在可以根据失败的记录和/或异常选择要使用。FailedRecordProcessor
DefaultAfterRollbackProcessor
BackOff
BackOff
现在可以在容器属性中配置。
有关详细信息,请参阅侦听器容器属性。adviceChain
当容器配置为发布 s 时,它现在会在发布空闲事件后收到记录时发布 。
有关详细信息,请参阅应用程序事件和检测空闲和无响应使用者。ListenerContainerIdleEvent
ListenerContainerNoLongerIdleEvent
@KafkaListener变更
使用手动分区分配时,您现在可以指定通配符,以确定应将哪些分区重置为初始偏移量。
此外,如果侦听器实现,则在手动赋值后调用。
(也添加到版本 2.5.5 中)。
有关详细信息,请参阅显式分区分配。ConsumerSeekAware
onPartitionsAssigned()
添加了方便的方法,使查找更容易。
有关详细信息,请参见 [seek]。AbstractConsumerSeekAware
ErrorHandler 更改
(例如,,)的子类现在可以配置为重置重试状态,如果异常与此记录之前发生的异常类型不同。FailedRecordProcessor
SeekToCurrentErrorHandler
DefaultAfterRollbackProcessor
RecoveringBatchErrorHandler
生产者工厂变更
现在,您可以为生产者设置最长期限,在此期限之后,生产者将被关闭并重新创建。 有关详细信息,请参阅交易。
现在,您可以在创建配置映射后更新配置映射。
例如,如果您必须在凭据更改后更新 SSL 密钥/信任存储位置,这可能很有用。
有关更多信息,请参见使用 DefaultKafkaProducerFactory
。DefaultKafkaProducerFactory
2.4 和 2.5 之间的变化
本节介绍从 2.4 版到 2.5 版所做的更改。 有关早期版本中的更改,请参阅更改历史记录。
消费者/生产者工厂变更
现在,每当创建或关闭使用者或生产者时,默认使用者和生产者工厂都可以调用回调。 提供了原生千分尺指标的实现。 有关详细信息,请参阅工厂侦听器。
现在,您可以在运行时更改引导服务器属性,从而启用故障转移到另一个 Kafka 群集。 有关详细信息,请参阅连接到 Kafka。
StreamsBuilderFactoryBean
变化
工厂 Bean 现在可以在创建或销毁时调用回调。
提供了原生千分尺度量的实现。
有关详细信息,请参阅 KafkaStreams 千分尺支持。KafkaStreams
传递尝试标头
现在有一个选项可以添加一个标头,用于在使用某些错误处理程序和回滚处理器后跟踪传递尝试。 有关详细信息,请参阅传递尝试标头。
@KafkaListener变更
现在,当返回类型为 时,如果需要,将自动填充默认回复标头。
有关详细信息,请参阅回复类型 Message<?>。@KafkaListener
Message<?>
当传入记录具有键时,不再填充值;标头被完全省略。KafkaHeaders.RECEIVED_MESSAGE_KEY
null
null
@KafkaListener
方法现在可以指定参数,而不是对元数据(如主题、分区等)使用离散标头。
有关详细信息,请参阅使用者记录元数据。ConsumerRecordMetadata
侦听器容器更改
容器属性现在是默认的。
有关详细信息,请参阅侦听器容器属性。assignmentCommitOption
LATEST_ONLY_NO_TX
现在,在使用事务时,容器属性是默认的。
有关详细信息,请参阅交易。subBatchPerPartition
true
现在提供了新的。RecoveringBatchErrorHandler
现在支持静态组成员身份。 有关更多信息,请参阅消息侦听器容器。
配置增量/协作重新平衡时,如果偏移量无法提交非致命 ,容器将尝试重新提交在重新平衡完成后仍分配给此实例的分区的偏移量。RebalanceInProgressException
默认错误处理程序现在是用于记录侦听器和批处理侦听器的处理程序。
有关详细信息,请参阅容器错误处理程序。SeekToCurrentErrorHandler
RecoveringBatchErrorHandler
现在,您可以控制记录标准错误处理程序有意引发的异常的级别。 有关详细信息,请参阅容器错误处理程序。
添加了该方法,可以更轻松地确定为并发容器中的哪些使用者分配了哪些分区。
有关详细信息,请参阅侦听器容器属性。getAssignmentsByClientId()
您现在可以禁止在错误中记录整个 s、调试日志等。
请参阅侦听器容器属性。ConsumerRecord
onlyLogRecordMetadata
KafkaTemplate 更改
现在可以维护千分尺计时器。
有关详细信息,请参阅监视。KafkaTemplate
现在可以使用属性进行配置,以覆盖生产者工厂中的属性。
有关详细信息,请参阅使用 KafkaTemplate
。KafkaTemplate
ProducerConfig
现在已经提供了 A。
有关详细信息,请参阅使用 RoutingKafkaTemplate
。RoutingKafkaTemplate
现在,您可以使用 instead 而不是来获取更窄的异常,从而更轻松地提取失败的 .
有关详细信息,请参阅使用 KafkaTemplate
。KafkaSendCallback
ListenerFutureCallback
ProducerRecord
Kafka 字符串序列化器/反序列化器
现在提供了新的 / s 以及相关的。
有关详细信息,请参阅字符串序列化。ToStringSerializer
StringDeserializer
SerDe
Json解串器
现在可以更灵活地确定反序列化类型。
有关更多信息,请参见使用方法确定类型。JsonDeserializer
委派序列化程序/反序列化程序
现在,当出站记录没有标头时,可以处理“标准”类型。
有关详细信息,请参阅委派序列化程序和反序列化程序。DelegatingSerializer
测试更改
帮助程序记录现在默认设置为。
有关详细信息,请参阅 JUnit。KafkaTestUtils.consumerProps()
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
earliest
2.3 和 2.4 之间的变化
ConsumerAwareRebalanceListener
像 ,这个接口现在有一个额外的方法。
有关详细信息,请参阅 Apache Kafka 文档。ConsumerRebalanceListener
onPartitionsLost
与 不同的是,默认实现不调用 。
相反,侦听器容器将在调用 ;因此,在实现 .ConsumerRebalanceListener
onPartitionsRevoked
onPartitionsLost
ConsumerAwareRebalanceListener
有关详细信息,请参阅“重新平衡侦听器”末尾的重要说明。
Kafka模板
现在支持非事务性发布和事务性发布。
有关更多信息,请参见 KafkaTemplate
事务性和非事务性发布。KafkaTemplate
聚合回复KafkaTemplate
现在是 .
现在,在超时(以及记录到达时)调用它;第二个参数是在超时后调用的情况下。releaseStrategy
BiConsumer
true
有关详细信息,请参阅聚合多个回复。
侦听器容器
提供了一个选项,允许侦听器容器在 抛出任何 后重试。
有关更多信息,请参阅其 JavaDocs 和 Using KafkaMessageListenerContainer
。ContainerProperties
authorizationExceptionRetryInterval
AuthorizationException
KafkaConsumer
@KafkaListener
注解具有新属性;默认值为 true。
当应答侦听器返回时,此属性控制是作为单个记录发送返回结果,还是发送每个元素的记录。
有关详细信息,请参阅使用 @SendTo
转发侦听器结果@KafkaListener
splitIterables
Iterable
批处理侦听器现在可以配置为 ;例如,这允许在事务中处理批处理,而侦听器一次获取一条记录。
在默认实现中,a 可用于处理批处理中的错误,而无需停止整个批处理 - 这在使用事务时可能很有用。
有关更多信息,请参见 Transactions with Batch Listeners。BatchToRecordAdapter
ConsumerRecordRecoverer
Kafka 流
接受新属性。
这允许在创建流之前配置构建器和/或拓扑。
有关更多信息,请参见 Spring Management。StreamsBuilderFactoryBean
KafkaStreamsInfrastructureCustomizer
2.2 和 2.3 之间的变化
本节介绍从版本 2.2 到版本 2.3 所做的更改。
提示、技巧和示例
添加了新的章节提示、技巧和示例。 请提交 GitHub 问题和/或拉取请求以获取该章中的其他条目。
配置更改
从版本 2.3.4 开始,容器属性默认为 false。
如果为真,则如果代理关闭,应用程序将无法启动;许多用户都受到此更改的影响;鉴于 Kafka 是一个高可用性平台,我们没有预料到在没有活动代理的情况下启动应用程序会是一个常见的用例。missingTopicsFatal
生产商和消费品工厂的变化
现在可以配置为为每个线程创建一个生产者。
您还可以在构造函数中提供实例,作为配置类(需要 no-arg 构造函数)或使用实例进行构造的替代方法,然后在所有生产者之间共享这些实例。
有关更多信息,请参见使用 DefaultKafkaProducerFactory
。DefaultKafkaProducerFactory
Supplier<Serializer>
Serializer
相同的选项也适用于 中的实例。
有关更多信息,请参见使用 KafkaMessageListenerContainer
。Supplier<Deserializer>
DefaultKafkaConsumerFactory
侦听器容器更改
以前,当使用侦听器适配器(如 s)调用侦听器时,错误处理程序会收到(实际侦听器异常为 )。
本机 s 引发的异常将原封不动地传递给错误处理程序。
现在,a 始终是参数(实际侦听器异常为 ),它提供对容器属性的访问。ListenerExecutionFailedException
cause
@KafkaListener
GenericMessageListener
ListenerExecutionFailedException
cause
group.id
由于侦听器容器具有自己的提交偏移量机制,因此它更喜欢 Kafka 为 。
它现在会自动将其设置为 false,除非在使用者工厂或容器的使用者属性覆盖中专门设置。ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
false
该属性现在是默认的。ackOnError
false
现在可以在侦听器方法中获取使用者的属性。
有关详细信息,请参阅获取使用者 group.id
。group.id
容器具有一个新属性,允许在调用侦听器之前检查或修改记录。
如果需要调用多个拦截器,则还会提供 A。
有关更多信息,请参阅消息侦听器容器。recordInterceptor
CompositeRecordInterceptor
具有新方法,允许您执行相对于开始、结束或当前位置的查找,并查找大于或等于时间戳的第一个偏移量。
有关详细信息,请参见 [seek]。ConsumerSeekAware
现在提供了一个便利类来简化搜索。
有关详细信息,请参见 [seek]。AbstractConsumerSeekAware
它提供了一个选项,用于让侦听器容器中的主循环在调用之间休眠。
有关更多信息,请参阅其 JavaDocs 和 Using KafkaMessageListenerContainer
。ContainerProperties
idleBetweenPolls
KafkaConsumer.poll()
使用 (或 ) 时,您现在可以通过调用 .
有关详细信息,请参阅提交偏移量。AckMode.MANUAL
MANUAL_IMMEDIATE
nack
Acknowledgment
现在可以使用千分尺监控侦听器性能。
有关详细信息,请参阅监视。Timer
容器现在发布与启动相关的其他使用者生命周期事件。 有关详细信息,请参阅应用程序事件。
事务批处理侦听器现在可以支持僵尸隔离。 有关详细信息,请参阅交易。
侦听器容器工厂现在可以配置 ,以便在创建和配置每个容器后进一步配置每个容器。
有关详细信息,请参阅容器工厂。ContainerCustomizer
ErrorHandler 更改
现在,将某些异常视为致命异常,并禁用这些异常的重试,并在第一次失败时调用恢复程序。SeekToCurrentErrorHandler
现在可以将 and 配置为在交付尝试之间应用(线程休眠)。SeekToCurrentErrorHandler
SeekToCurrentBatchErrorHandler
BackOff
从版本 2.3.2 开始,当错误处理程序在恢复失败的记录后返回时,将提交恢复记录的偏移量。
现在,当与 结合使用时,将发送到死信主题的消息的有效负载设置为无法反序列化的原始值。
以前,它和用户代码需要从消息标头中提取。
有关详细信息,请参阅发布死信记录。DeadLetterPublishingRecoverer
ErrorHandlingDeserializer
null
DeserializationException
主题生成器
新增了 s,以便更方便地创建 s 以进行自动主题配置。
有关详细信息,请参阅 [configuring-topics]。TopicBuilder
NewTopic
@Bean
Kafka Streams 更改
现在,您可以对 创建的 执行其他配置。
有关更多信息,请参阅流配置。StreamsBuilderFactoryBean
@EnableKafkaStreams
现在提供了 A,它允许恢复具有反序列化错误的记录。
它可以与 结合使用,将这些记录发送到死信主题。
有关详细信息,请参阅从反序列化异常中恢复。RecoveringDeserializationExceptionHandler
DeadLetterPublishingRecoverer
已提供转换器,使用 SpEL 生成标头值。
有关详细信息,请参阅标头 Enricher。HeaderEnricher
已提供。
这允许 Kafka 流拓扑与 spring-messaging 组件(如 Spring 集成流)进行交互。
有关更多信息,请参阅 MessagingProcessor
和 [从 KStream
调用 Spring 集成流]。MessagingTransformer
JSON 组件更改
现在,默认情况下,所有 JSON 感知组件都配置了由 .
现在提供了基于 的构造函数,以便更好地处理目标泛型容器类型。
此外,还引入了一个用于序列化为普通字符串。
有关更多信息,请参阅其 JavaDocs 和序列化、反序列化和消息转换。ObjectMapper
JacksonUtils.enhancedObjectMapper()
JsonDeserializer
TypeReference
JacksonMimeTypeModule
org.springframework.util.MimeType
为所有 Json 转换器提供了 A 以及一个新的超类。
此外,a 现在可用;它可以序列化 和 s 中的值。
有关更多信息,请参阅 Spring Messaging Message Conversion。ByteArrayJsonMessageConverter
JsonMessageConverter
StringOrBytesSerializer
byte[]
Bytes
String
ProducerRecord
和 现在具有流畅的 API,使编程配置更简单。
有关更多信息,请参阅 javadocs、序列化、反序列化和消息转换以及流 JSON 序列化和反序列化。JsonSerializer
JsonDeserializer
JsonSerde
回复KafkaTemplate
当回复超时时,将来会以 a 而不是 .KafkaReplyTimeoutException
KafkaException
此外,现在还提供了一个重载方法,该方法允许基于每条消息指定回复超时。sendAndReceive
聚合回复KafkaTemplate
通过聚合来自多个接收方的回复来扩展。
有关详细信息,请参阅聚合多个回复。ReplyingKafkaTemplate
事务变更
您现在可以覆盖 和 上的生产者工厂。
有关详细信息,请参阅 transactionIdPrefix
。transactionIdPrefix
KafkaTemplate
KafkaTransactionManager
新的委派序列化程序/反串化程序
该框架现在提供委托 和 ,利用标头来生成和使用具有多个键/值类型的记录。
有关详细信息,请参阅委派序列化程序和反序列化程序。Serializer
Deserializer
新重试反串化程序
该框架现在提供了一个委托,以便在可能发生暂时性错误(如网络问题)时重试序列化。
有关详细信息,请参阅重试反串化程序。RetryingDeserializer
2.1 和 2.2 之间的更改
类和包更改
该类已从 移动到 。ContainerProperties
org.springframework.kafka.listener.config
org.springframework.kafka.listener
枚举已从 移动到 。AckMode
AbstractMessageListenerContainer
ContainerProperties
和 方法已从 和 移动到 。setBatchErrorHandler()
setErrorHandler()
ContainerProperties
AbstractMessageListenerContainer
AbstractKafkaListenerContainerFactory
回滚处理后
提供了一种新的策略。
有关详细信息,请参阅回滚后处理器。AfterRollbackProcessor
ConcurrentKafkaListenerContainerFactory
变化
您现在可以使用 来创建和配置任何 ,而不仅仅是用于注释的那些。
有关详细信息,请参阅容器工厂。ConcurrentKafkaListenerContainerFactory
ConcurrentMessageListenerContainer
@KafkaListener
侦听器容器更改
添加了新的容器属性 ()。
有关更多信息,请参见使用 KafkaMessageListenerContainer
。missingTopicsFatal
现在,当使用者停止时,会发出 A。
有关详细信息,请参阅线程安全。ConsumerStoppedEvent
批处理侦听器可以选择接收完整的对象,而不是 .
有关详细信息,请参阅 [batch-listeners]。ConsumerRecords<?, ?>
List<ConsumerRecord<?, ?>
和 现在可以恢复(跳过)不断失败的记录,默认情况下,在 10 次失败后执行此操作。
它们可以配置为将失败的记录发布到死信主题。DefaultAfterRollbackProcessor
SeekToCurrentErrorHandler
从版本 2.2.4 开始,在选择死信主题名称时可以使用使用者的组 ID。
已添加。
有关详细信息,请参阅应用程序事件。ConsumerStoppingEvent
现在可以配置为在配置容器时提交已恢复记录的偏移量(从 2.2.4 开始)。SeekToCurrentErrorHandler
AckMode.MANUAL_IMMEDIATE
@KafkaListener变更
现在,您可以通过在批注上设置属性来覆盖侦听器容器工厂的 and 属性。
现在,您可以添加配置来确定将哪些标头(如果有)复制到回复消息中。
有关详细信息@KafkaListener
请参阅注释。concurrency
autoStartup
现在,您可以将自己的注释用作元注释。
有关详细信息,请参阅@KafkaListener
作为元注释。@KafkaListener
现在,配置验证更容易。
有关详细信息@KafkaListener请参阅@Payload
验证。Validator
@Payload
现在,您可以直接在注解上指定 kafka 使用者属性;这些属性将覆盖使用者工厂中定义的具有相同名称的任何属性(从版本 2.2.4 开始)。 有关详细信息,请参阅注释属性。
标头映射更改
和 类型的标头现在映射为值中的简单字符串。
以前,它们被映射为 JSON,并且仅被解码。 无法解码。
它们现在是用于互操作性的简单字符串。MimeType
MediaType
RecordHeader
MimeType
MediaType
此外,它还具有一种新方法,允许使用而不是 JSON 来指定应映射的类型。
有关详细信息,请参阅邮件头。DefaultKafkaHeaderMapper
addToStringClasses
toString()
嵌入式 Kafka 更改
该类及其接口已被弃用,取而代之的是 及其 JUnit 4 包装器。
注解现在填充 bean 而不是已弃用的 .
此更改允许在 JUnit 5 测试中使用。
注释现在具有用于指定填充 .
有关详细信息,请参阅测试应用程序。KafkaEmbedded
KafkaRule
EmbeddedKafkaBroker
EmbeddedKafkaRule
@EmbeddedKafka
EmbeddedKafkaBroker
KafkaEmbedded
@EmbeddedKafka
@EmbeddedKafka
ports
EmbeddedKafkaBroker
JsonSerializer/Deserializer 增强功能
现在,可以使用生产者和使用者属性提供类型映射信息。
反序列化程序上提供了新的构造函数,以允许使用提供的目标类型覆盖类型标头信息。
默认情况下,现在会删除任何类型信息标头。JsonDeserializer
现在,您可以使用 Kafka 属性(从 2.2.3 开始)配置为忽略类型信息标头。JsonDeserializer
有关详细信息,请参阅序列化、反序列化和消息转换。
Kafka Streams 更改
流配置 Bean 现在必须是对象而不是对象。KafkaStreamsConfiguration
StreamsConfig
已从包移动到 .StreamsBuilderFactoryBean
…core
…config
引入 是为了在实例之上构建条件分支时提供更好的最终用户体验。KafkaStreamBrancher
KStream
有关更多信息,请参阅 Apache Kafka Streams 支持和配置。
事务 ID
当侦听器容器启动事务时,现在附加了 .
此更改允许对僵尸进行适当的围栏,如此处所述。transactional.id
transactionIdPrefix
<group.id>.<topic>.<partition>
2.0 和 2.1 之间的更改
JSON 改进
现在在 中添加类型信息,让转换器在接收时创建特定的类型,基于消息本身而不是固定的配置类型。
有关详细信息,请参阅序列化、反序列化和消息转换。StringJsonMessageConverter
JsonSerializer
Headers
JsonDeserializer
容器停止错误处理程序
现在为记录和批处理侦听器提供了容器错误处理程序,这些侦听器将侦听器引发的任何异常视为致命异常。 他们停止容器。 有关详细信息,请参阅处理异常。
暂停和恢复容器
侦听器容器现在具有 和 方法(从版本 2.1.3 开始)。
有关详细信息,请参阅暂停和恢复侦听器容器。pause()
resume()
有状态重试
从版本 2.1.3 开始,您可以配置有状态重试。 有关详细信息,请参阅有状态重试。
客户端 ID
从版本 2.1.1 开始,您现在可以将前缀设置为 。
以前,若要自定义客户端 ID,需要每个侦听器有一个单独的使用者工厂(和容器工厂)。
前缀以后缀为后缀,以便在使用并发时提供唯一的客户端 ID。client.id
@KafkaListener
-n
日志记录偏移提交
默认情况下,主题偏移提交的日志记录是使用日志记录级别执行的。
从版本 2.1.2 开始,调用中的新属性允许您指定这些消息的日志级别。
有关更多信息,请参见使用 KafkaMessageListenerContainer
。DEBUG
ContainerProperties
commitLogLevel
默认@KafkaHandler
从版本 2.1.3 开始,您可以将类级别的注释之一指定为默认注释。
有关详细信息,请参阅类上的@KafkaListener
。@KafkaHandler
@KafkaListener
回复KafkaTemplate
从版本 2.1.3 开始,提供了一个子类来支持请求/回复语义。
有关详细信息,请参阅使用 ReplyingKafkaTemplate
。KafkaTemplate
从 2.0 开始的迁移指南
请参阅 2.0 到 2.1 迁移指南。
1.3 和 2.0 之间的更改
@KafkaListener
变化
现在,您可以使用 来批注方法(以及类和方法)。
如果该方法返回结果,则将其转发到指定的主题。
有关详细信息,请参阅使用 @SendTo
转发侦听器结果。@KafkaListener
@KafkaHandler
@SendTo
消息侦听器
消息侦听器现在可以知道该对象。
有关详细信息,请参阅 [message-listeners]。Consumer
用ConsumerAwareRebalanceListener
重新平衡侦听器现在可以在重新平衡通知期间访问对象。
有关详细信息,请参阅重新平衡侦听器。Consumer
1.2 和 1.3 之间的变化
支持事务
0.11.0.0 客户端库增加了对事务的支持。
添加了对事务的其他支持。
有关详细信息,请参阅交易。KafkaTransactionManager
支持标头
0.11.0.0 客户端库添加了对消息标头的支持。
现在可以将这些映射到 和 映射到 。
有关详细信息,请参阅邮件头。spring-messaging
MessageHeaders
支持 Kafka 时间戳
KafkaTemplate
现在支持添加带有时间戳的记录的 API。
引入了有关支持的新功能。
此外,还添加了新的和测试实用程序。
有关更多详细信息,请参阅使用 KafkaTemplate
、@KafkaListener
Annotation 和测试应用程序。KafkaHeaders
timestamp
KafkaConditions.timestamp()
KafkaMatchers.hasTimestamp()
@KafkaListener
变化
现在,您可以配置 a 来处理异常。
有关详细信息,请参阅处理异常。KafkaListenerErrorHandler
默认情况下,该属性现在用作属性,覆盖在使用者工厂中配置的属性(如果存在)。
此外,您可以显式配置注释。
以前,您需要一个单独的容器工厂(和使用者工厂)来对侦听器使用不同的值。
要恢复以前使用出厂配置的行为,请将注解上的属性设置为 。@KafkaListener
id
group.id
groupId
group.id
group.id
idIsGroup
false
@EmbeddedKafka
注解
为方便起见,提供了测试类级注释,以注册为 bean。
有关详细信息,请参阅测试应用程序。@EmbeddedKafka
KafkaEmbedded
Kerberos 配置
现在提供了对配置 Kerberos 的支持。 有关更多信息,请参阅 JAAS 和 Kerberos。
1.0 和 1.1 之间的更改
寻求
现在,您可以查找每个主题或分区的位置。 您可以使用它来设置初始化期间的初始位置,当使用组管理并且 Kafka 分配分区时。 您还可以在检测到空闲容器时或应用程序执行中的任何任意点进行查找。 有关详细信息,请参见 [seek]。