此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.2.4spring-doc.cn

此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.2.4spring-doc.cn

自 2.9 以来 3.0 中的新增功能

Kafka 客户端版本

此版本需要 3.3.1 .kafka-clientsspring-doc.cn

Exactly Once 语义

EOSMode.V1(aka) 不再受支持。ALPHAspring-doc.cn

使用事务时,最低代理版本为 2.5。

有关更多信息,请参阅 Exactly Once 语义KIP-447spring-doc.cn

观察

现在支持使用 Micrometer 对计时器启用观察和跟踪。 有关更多信息,请参阅 Observation (观察)。spring-doc.cn

本机映像

支持创建本机映像。 有关更多信息,请参阅本机映像spring-doc.cn

全局单嵌入式 Kafka

嵌入式 Kafka () 现在可以作为整个测试计划的单个全局实例启动。 有关更多信息,请参阅对多个测试类使用相同的 Broker(s)。EmbeddedKafkaBrokerspring-doc.cn

可重试主题更改

此功能不再被视为实验性功能(就其 API 而言),该功能本身自 2.7 以来一直受支持,但破坏 API 更改的可能性比正常情况更大。spring-doc.cn

在此版本中,非阻塞重试基础结构 bean 的引导已更改,以避免在某些应用程序中发生的有关应用程序初始化的一些计时问题。spring-doc.cn

您现在可以为重试容器设置不同的容器;默认情况下,并发性与主容器相同。concurrencyspring-doc.cn

@RetryableTopic现在可以用作自定义注释的元注释,包括对属性的支持。@AliasForspring-doc.cn

有关更多信息,请参阅配置spring-doc.cn

重试主题的默认复制因子为 now (use broker default)。 如果您的代理版本低于 2.4,则现在需要显式设置该属性。-1spring-doc.cn

现在,您可以在同一应用程序上下文中为同一主题配置多个侦听器。 以前,这是不可能的。 有关更多信息,请参阅多个侦听器,相同主题@RetryableTopicspring-doc.cn

中存在 API 的重大更改;具体来说,如果你覆盖了 、 和/或 ; 这些方法现在需要一个参数。RetryTopicConfigurationSupportdestinationTopicResolverkafkaConsumerBackoffManagerretryTopicConfigurerObjectProvider<RetryTopicComponentFactory>spring-doc.cn

侦听器容器更改

与使用者身份验证和授权失败相关的事件现在由容器发布。 有关更多信息,请参阅应用程序事件spring-doc.cn

您现在可以自定义使用者线程使用的线程名称。 有关更多信息,请参阅容器线程命名spring-doc.cn

已添加 container 属性。 有关更多信息,请参阅 Listener Container Properties (侦听器容器属性)。restartAfterAuthExceptionspring-doc.cn

KafkaTemplate变化

这个类返回的 future 现在是 s 而不是 s。 请参阅使用 KafkaTemplateCompletableFutureListenableFuturespring-doc.cn

ReplyingKafkaTemplate变化

这个类返回的 future 现在是 s 而不是 s。 请参阅使用 ReplyingKafkaTemplate使用 Message<?> 请求/回复CompletableFutureListenableFuturespring-doc.cn

@KafkaListener变化

您现在可以使用自定义关联标头,该标头将在任何回复消息中回显。 有关更多信息,请参见 Using ReplyingKafkaTemplate 末尾的注释。spring-doc.cn

现在,您可以在处理整个批次之前手动提交批次的各个部分。 有关更多信息,请参阅提交偏移量spring-doc.cn

KafkaHeaders变化

在 2.9.x 中弃用的四个常量现已删除。KafkaHeadersspring-doc.cn

同样, 替换为 和 替换为 。RECEIVED_MESSAGE_KEYRECEIVED_KEYRECEIVED_PARTITION_IDRECEIVED_PARTITIONspring-doc.cn

测试更改

版本 3.0.7 引入了 a 和 . 有关更多信息,请参阅 Mock Consumer 和 ProducerMockConsumerFactoryMockProducerFactoryspring-doc.cn

从版本 3.0.10 开始,默认情况下,嵌入式 Kafka 代理将 Spring Boot 属性设置为嵌入式代理的地址。spring.kafka.bootstrap-serversspring-doc.cn

使用事务时,最低代理版本为 2.5。

自 2.8 以来 2.9 中的新增功能

Kafka 客户端版本

此版本需要 3.2.0 .kafka-clientsspring-doc.cn

错误处理程序更改

现在可以将 配置为暂停容器进行一次轮询,并使用上一次轮询的剩余结果,而不是寻找剩余记录的偏移量。 有关更多信息,请参阅 DefaultErrorHandlerDefaultErrorHandlerspring-doc.cn

现在有一个属性。 有关更多信息,请参见 Back Off HandlersDefaultErrorHandlerBackOffHandlerspring-doc.cn

侦听器容器更改

interceptBeforeTx现在适用于所有事务管理器(以前仅在使用 a 时应用)。 参见 [interceptBeforeTx]。KafkaAwareTransactionManagerspring-doc.cn

提供了一个新的 container 属性,该属性允许容器在处理当前记录后暂停使用者,而不是在处理了上一次轮询中的所有记录之后。 请参阅 [pauseImmediate]。pauseImmediatespring-doc.cn

与消费者身份验证和授权相关的事件spring-doc.cn

Header Mapper 更改

您现在可以配置应映射的入站标头。 在版本 2.8.8 或更高版本中也可用。 有关更多信息,请参阅 Message Headersspring-doc.cn

KafkaTemplate变化

在 3.0 中,该类返回的 future 将是 s 而不是 s。 请参阅使用 KafkaTemplate 以获取有关使用此版本时进行过渡的帮助。CompletableFutureListenableFuturespring-doc.cn

ReplyingKafkaTemplate变化

该模板现在提供了一种在回复容器上等待分配的方法,以避免在初始化回复容器之前发送请求时发生争用。 在版本 2.8.8 或更高版本中也可用。 请参见使用 ReplyingKafkaTemplatespring-doc.cn

在 3.0 中,该类返回的 future 将是 s 而不是 s。 有关使用此版本时过渡的帮助,请参阅使用 ReplyingKafkaTemplate使用 Message<?> 请求/回复CompletableFutureListenableFuturespring-doc.cn

自 2.7 以来 2.8 中的新增功能

本节介绍从版本 2.7 到版本 2.8 所做的更改。 有关早期版本的更改,请参阅更改历史记录spring-doc.cn

Kafka 客户端版本

此版本需要 3.0.0kafka-clientsspring-doc.cn

套件更改

与类型映射相关的类和接口已从 移动到 。…​support.converter…​support.mappingspring-doc.cn

无序手动提交

现在可以将侦听器容器配置为接受不按顺序(通常是异步)的手动偏移提交。 容器将延迟提交,直到确认缺少的偏移量。 有关更多信息,请参阅手动提交偏移量。spring-doc.cn

@KafkaListener变化

现在可以指定侦听器方法是否是方法本身的批处理侦听器。 这允许将同一容器工厂用于记录侦听器和批处理侦听器。spring-doc.cn

有关更多信息,请参阅 [batch-listeners]。spring-doc.cn

批处理侦听器现在可以处理转换异常。spring-doc.cn

RecordFilterStrategy与批处理侦听器一起使用时,现在可以在一次调用中筛选整个批处理。 有关更多信息,请参阅 [batch-listeners] 末尾的注释。spring-doc.cn

注解现在具有属性,用于仅覆盖此侦听器的容器工厂。@KafkaListenerfilterRecordFilterStrategyspring-doc.cn

注释现在具有 attribute;这用于填充 New Listener Container 属性 。 然后,这用于填充每条记录中的标题,该标题可以在 、 或侦听器本身中使用。 有关更多信息,请参阅 Listener Info HeaderAbstract Listener Container Properties@KafkaListenerinfolistenerInfoKafkaHeaders.LISTENER_INFORecordInterceptorRecordFilterStrategyspring-doc.cn

KafkaTemplate变化

您现在可以接收给定主题、分区和偏移量的单个记录。 有关更多信息,请参阅使用 KafkaTemplate 接收spring-doc.cn

CommonErrorHandler添加

用于记录批处理侦听器的 legacy 及其子接口层次结构已替换为新的单个接口,其实现对应于 的大多数 legacy 实现。 有关更多信息,请参阅容器错误处理程序和将自定义旧版错误处理程序实现迁移到 CommonErrorHandlerGenericErrorHandlerCommonErrorHandlerGenericErrorHandlerspring-doc.cn

侦听器容器更改

container 属性现在是默认的。interceptBeforeTxtruespring-doc.cn

该属性已重命名为 s,现在除了以前的 s 之外,还应用于 s。 这两个异常都被视为致命的,除非设置了此属性,否则容器将默认停止。authorizationExceptionRetryIntervalauthExceptionRetryIntervalAuthenticationExceptionAuthorizationExceptionspring-doc.cn

Serializer/Deserializer 更改

现在提供了 和。 有关更多信息,请参见Delegating Serializer 和 DeserializerDelegatingByTopicSerializerDelegatingByTopicDeserializerspring-doc.cn

DeadLetterPublishingRecover变化

该属性现在是默认的。stripPreviousExceptionHeaderstruespring-doc.cn

现在有几种技术可以自定义将哪些标头添加到输出记录中。spring-doc.cn

有关更多信息,请参阅 管理死信记录标头spring-doc.cn

可重试主题更改

现在,您可以对可重试和不可重试的主题使用相同的工厂。 有关更多信息,请参阅指定 ListenerContainerFactoryspring-doc.cn

现在有一个可管理的致命异常全局列表,这些异常将使失败的记录直接进入 DLT。 请参阅 Exception Classifier 以了解如何管理它。spring-doc.cn

现在,您可以结合使用阻塞和非阻塞重试。 有关更多信息,请参阅组合阻塞和非阻塞重试spring-doc.cn

使用可重试主题功能时引发的 KafkaBackOffException 现在记录在 DEBUG 级别。 如果您需要将日志记录级别更改回 WARN 或将其设置为任何其他级别,请参阅更改 KafkaBackOffException 日志记录级别spring-doc.cn

2.6 和 2.7 之间的更改

Kafka 客户端版本

此版本需要 2.7.0 . 它还与 2.8.0 客户端兼容,从 2.7.1 版本开始;请参见覆盖 Spring Boot 依赖项kafka-clientsspring-doc.cn

使用主题的非阻塞延迟重试

此版本中添加了这一重要的新功能。 当严格排序不重要时,失败的投放可以发送到另一个主题以供以后使用。 可以配置一系列此类重试主题,但延迟会增加。 有关更多信息,请参阅Non-Blocking Retriesspring-doc.cn

侦听器容器更改

container 属性现在是默认的。onlyLogRecordMetadatatruespring-doc.cn

新的容器属性现已推出。stopImmediatespring-doc.cn

在两次投放尝试之间使用 的错误处理程序(例如 和 ) 现在将在容器停止后不久退出 Back off 间隔,而不是延迟停止。BackOffSeekToCurrentErrorHandlerDefaultAfterRollbackProcessorspring-doc.cn

现在可以为扩展的错误处理程序和回滚后处理器配置一个或多个 s,以接收有关重试和恢复进度的信息。FailedRecordProcessorRetryListenerspring-doc.cn

现在,在侦听器返回后调用了其他方法(通常,或通过抛出异常)。 它还有一个 子接口 . 此外,现在还有一个 for batch listeners。 有关更多信息,请参阅 Message Listener ContainersRecordInterceptorConsumerAwareRecordInterceptorBatchInterceptorspring-doc.cn

@KafkaListener变化

您现在可以验证方法(类级侦听器)的 payload 参数。 有关更多信息请参阅 @KafkaListener @Payload 验证@KafkaHandlerspring-doc.cn

现在,您可以在 和 上设置属性,这会导致将 raw 添加到转换后的 . 这很有用,例如,如果您希望在 listener 错误处理程序中使用 。 有关更多信息,请参阅 Listener Error HandlersrawRecordHeaderMessagingMessageConverterBatchMessagingMessageConverterConsumerRecordMessage<?>DeadLetterPublishingRecovererspring-doc.cn

现在,您可以在应用程序初始化期间修改注释。 有关更多信息,请参见 @KafkaListener Attribute Modification@KafkaListenerspring-doc.cn

DeadLetterPublishingRecover变化

现在,如果 key 和 value 都失败了,则原始值将发布到 DLT。 以前,该值已填充,但键仍保留在标头中。 如果您将 recoverer 子类化并覆盖该方法,则会出现中断性 API 更改。DeserializationExceptioncreateProducerRecordspring-doc.cn

此外,recoverer 会在发布到目标 resolver 选择的分区之前验证该分区是否确实存在。spring-doc.cn

有关更多信息,请参阅发布死信记录spring-doc.cn

ChainedKafkaTransactionManager已弃用

有关更多信息,请参阅 事务spring-doc.cn

ReplyingKafkaTemplate变化

现在有一种机制可以检查回复,如果存在某些条件,则异常地使 future 失败。spring-doc.cn

添加了对发送和接收的支持。spring-messagingMessage<?>spring-doc.cn

有关更多信息,请参阅使用 ReplyingKafkaTemplatespring-doc.cn

Kafka Streams 更改

默认情况下,现在配置为不清理本地状态。 有关更多信息,请参阅配置StreamsBuilderFactoryBeanspring-doc.cn

KafkaAdmin变化

新方法和 已添加。 已添加以方便在单个 bean 中配置多个主题。 有关更多信息,请参见 [configuring-topics]。createOrModifyTopicsdescribeTopicsKafkaAdmin.NewTopicsspring-doc.cn

MessageConverter变化

现在可以向 中添加 ,从而允许基于标头进行内容协商。 有关更多信息,请参见Spring Messaging Message Conversionspring-messagingSmartMessageConverterMessagingMessageConvertercontentTypespring-doc.cn

测序 s@KafkaListener

有关更多信息,请参阅按顺序@KafkaListener 开始spring-doc.cn

ExponentialBackOffWithMaxRetries

提供了新的实现,可以更方便地配置最大重试次数。 有关更多信息,请参阅 ExponentialBackOffWithMaxRetries 实现BackOffspring-doc.cn

条件委托错误处理程序

这些新的错误处理程序可以配置为委托给不同的错误处理程序,具体取决于异常类型。 有关更多信息,请参阅委派错误处理程序spring-doc.cn

2.5 和 2.6 之间的变化

Kafka 客户端版本

此版本需要 2.6.0 .kafka-clientsspring-doc.cn

侦听器容器更改

默认值现在为 。 有关更多信息,请参阅 Exactly Once 语义EOSModeBETAspring-doc.cn

各种错误处理程序(扩展)和现在重置 if 恢复失败。 此外,您现在可以根据失败的记录和/或异常选择要使用的 。FailedRecordProcessorDefaultAfterRollbackProcessorBackOffBackOffspring-doc.cn

您现在可以在容器属性中配置 。 有关更多信息,请参阅 Listener Container Properties (侦听器容器属性)。adviceChainspring-doc.cn

当容器配置为发布时,它现在会在发布空闲事件后收到记录时发布一个。 有关更多信息,请参阅应用程序事件检测空闲和无响应的使用者ListenerContainerIdleEventListenerContainerNoLongerIdleEventspring-doc.cn

@KafkaListener 更改

使用手动分区分配时,您现在可以指定通配符来确定哪些分区应重置为初始偏移量。 此外,如果侦听器实现 ,则会在手动分配后调用 。 (也在版本 2.5.5 中添加)。 有关更多信息,请参阅 Explicit Partition AssignmentConsumerSeekAwareonPartitionsAssigned()spring-doc.cn

添加了便捷方法,使查找更加容易。 有关更多信息,请参见 [seek]。AbstractConsumerSeekAwarespring-doc.cn

ErrorHandler 更改

现在可以将 (例如 , , ) 的子类配置为在异常类型与之前此记录发生的类型不同时重置重试状态。FailedRecordProcessorSeekToCurrentErrorHandlerDefaultAfterRollbackProcessorRecoveringBatchErrorHandlerspring-doc.cn

Producer Factory 更改

现在,您可以为创建者设置一个最长期限,超过该期限后,它们将被关闭并重新创建。 有关更多信息,请参阅 事务spring-doc.cn

现在,您可以在创建配置映射后更新配置映射。 这可能很有用,例如,如果您必须在凭证更改后更新 SSL 密钥/信任存储位置。 有关更多信息,请参见使用 DefaultKafkaProducerFactoryDefaultKafkaProducerFactoryspring-doc.cn

2.4 和 2.5 之间的变化

本节介绍从版本 2.4 到版本 2.5 所做的更改。 有关早期版本的更改,请参阅更改历史记录spring-doc.cn

消费者/生产者出厂设置更改

现在,每当创建或关闭 Consumer 或 Producer 时,默认的 consumer 和 producer 工厂都可以调用回调。 提供了本机 Micrometer 度量的实现。 有关更多信息,请参阅 Factory Listenersspring-doc.cn

现在,您可以在运行时更改引导服务器属性,从而能够故障转移到另一个 Kafka 集群。 有关更多信息,请参阅连接到 Kafkaspring-doc.cn

StreamsBuilderFactoryBean变化

工厂 Bean 现在可以在创建或销毁时调用回调。 提供了本机 Micrometer 度量的实现。 有关更多信息,请参阅 KafkaStreams Micrometer SupportKafkaStreamsspring-doc.cn

Kafka 客户端版本

此版本需要 2.5.0 。kafka-clientsspring-doc.cn

类/包更改

SeekUtils已从包移动到 。o.s.k.supporto.s.k.listenerspring-doc.cn

Delivery Attempts 标头

现在有一个选项可以添加一个 headers,该 headers 在使用某些错误处理程序时和回滚处理器之后跟踪传递尝试。 有关更多信息,请参阅 Delivery Attempts Headerspring-doc.cn

@KafkaListener 更改

现在,当返回类型为 . 有关更多信息,请参阅 Reply Type Message<?>@KafkaListenerMessage<?>spring-doc.cn

当传入记录具有键时,不再填充值;标题被完全省略。KafkaHeaders.RECEIVED_MESSAGE_KEYnullnullspring-doc.cn

@KafkaListener方法现在可以指定一个参数,而不是对元数据(如 Topic、Partition 等)使用离散的标头。 有关更多信息,请参阅 Consumer Record MetadataConsumerRecordMetadataspring-doc.cn

侦听器容器更改

container 属性现在是默认的。 有关更多信息,请参阅 Listener Container Properties (侦听器容器属性)。assignmentCommitOptionLATEST_ONLY_NO_TXspring-doc.cn

现在,在使用事务时,容器属性是默认的。 有关更多信息,请参阅 事务subBatchPerPartitiontruespring-doc.cn

现在提供了一个 new。RecoveringBatchErrorHandlerspring-doc.cn

现在支持静态组成员资格。 有关更多信息,请参阅 Message Listener Containersspring-doc.cn

配置增量/协作再平衡时,如果偏移量无法以非致命方式提交,则容器将尝试在再平衡完成后重新提交仍分配给此实例的分区的偏移量。RebalanceInProgressExceptionspring-doc.cn

默认错误处理程序现在是 for record listeners 和 for batch listeners。 有关更多信息,请参阅容器错误处理程序SeekToCurrentErrorHandlerRecoveringBatchErrorHandlerspring-doc.cn

现在,您可以控制记录标准错误处理程序有意引发的异常的级别。 有关更多信息,请参阅容器错误处理程序spring-doc.cn

添加了该方法,可以更轻松地确定并发容器中的哪些使用者被分配了哪些分区。 有关更多信息,请参阅 Listener Container Properties (侦听器容器属性)。getAssignmentsByClientId()spring-doc.cn

您现在可以禁止记录整个错误、调试日志等。 请参见侦听器容器属性ConsumerRecordonlyLogRecordMetadataspring-doc.cn

KafkaTemplate 更改

现在可以维护千分尺计时器。 有关更多信息,请参阅监控KafkaTemplatespring-doc.cn

现在可以使用 属性进行配置,以覆盖 producer Factory 中的属性。 有关更多信息,请参阅使用 KafkaTemplateKafkaTemplateProducerConfigspring-doc.cn

现在已经提供了 A。 有关更多信息,请参阅使用 RoutingKafkaTemplateRoutingKafkaTemplatespring-doc.cn

现在,您可以使用 instead to 获取更窄的异常,从而更轻松地提取失败的 . 有关更多信息,请参阅使用 KafkaTemplateKafkaSendCallbackListenerFutureCallbackProducerRecordspring-doc.cn

Kafka 字符串序列化器/反序列化器

现在提供了新的 / s 以及关联的 s。 有关更多信息,请参阅字符串序列化ToStringSerializerStringDeserializerSerDespring-doc.cn

JsonDeserializer

现在可以更灵活地确定反序列化类型。 有关更多信息,请参阅使用方法确定类型JsonDeserializerspring-doc.cn

委托序列化器/反序列化器

现在可以处理 “standard” 类型,当出站记录没有标头时。 有关更多信息,请参见Delegating Serializer 和 DeserializerDelegatingSerializerspring-doc.cn

测试更改

帮助程序记录现在设置为 by default。 有关更多信息,请参阅 JUnitKafkaTestUtils.consumerProps()ConsumerConfig.AUTO_OFFSET_RESET_CONFIGearliestspring-doc.cn

2.3 和 2.4 之间的更改

Kafka 客户端版本

此版本需要 2.4.0 或更高版本,并支持新的增量再平衡功能。kafka-clientsspring-doc.cn

ConsumerAwareRebalanceListener 的

与 一样,此接口现在具有 additional method 。 有关更多信息,请参阅 Apache Kafka 文档。ConsumerRebalanceListeneronPartitionsLostspring-doc.cn

与 不同,默认实现调用 . 相反,侦听器容器将在调用 ;因此,在实现 .ConsumerRebalanceListeneronPartitionsRevokedonPartitionsLostConsumerAwareRebalanceListenerspring-doc.cn

有关更多信息,请参阅 Rebalancing Listeners 末尾的重要说明。spring-doc.cn

GenericErrorHandler

默认实现现在默认返回 true。isAckAfterHandle()spring-doc.cn

Kafka模板

现在支持非事务性发布和事务性发布。 有关更多信息,请参阅 KafkaTemplate 事务性和非事务性发布KafkaTemplatespring-doc.cn

AggregatingReplyingKafka模板

现在是 . 现在,在超时后(以及记录到达时)调用它;第二个参数是在 timeout 后调用的情况下。releaseStrategyBiConsumertruespring-doc.cn

有关更多信息,请参阅 聚合多个回复spring-doc.cn

侦听器容器

这提供了一个选项,允许侦听器容器在 . 有关更多信息,请参阅其 JavaDocs 和使用 KafkaMessageListenerContainerContainerPropertiesauthorizationExceptionRetryIntervalAuthorizationExceptionKafkaConsumerspring-doc.cn

@KafkaListener

注释具有 new property ;默认为 true。 当回复侦听器返回时,this 属性控制返回结果是否作为单个记录发送,还是发送每个元素的一条记录。 有关更多信息,请参阅使用 @SendTo 转发侦听器结果@KafkaListenersplitIterablesIterablespring-doc.cn

批处理侦听器现在可以使用 ;例如,这允许在事务中处理 BATCH,而侦听器一次获取一条记录。 使用默认实现,a 可用于处理批处理中的错误,而无需停止整个批处理的处理 - 这在使用 transactions 时可能很有用。 有关更多信息,请参阅使用 Batch Listeners 的事务BatchToRecordAdapterConsumerRecordRecovererspring-doc.cn

Kafka 流

接受 新属性 。 这允许在创建流之前配置生成器和/或拓扑。 有关更多信息,请参见 Spring ManagementStreamsBuilderFactoryBeanKafkaStreamsInfrastructureCustomizerspring-doc.cn

2.2 和 2.3 之间的变化

本节介绍从版本 2.2 到版本 2.3 所做的更改。spring-doc.cn

提示、技巧和示例

添加了新章节 提示、技巧和示例。 请提交 GitHub 问题和/或拉取请求以获取该章节中的其他条目。spring-doc.cn

Kafka 客户端版本

此版本需要 2.3.0 或更高版本。kafka-clientsspring-doc.cn

类/包更改

TopicPartitionInitialOffset已弃用,取而代之的是 。TopicPartitionOffsetspring-doc.cn

配置更改

从版本 2.3.4 开始,container 属性默认为 false。 如果为 true,则如果代理已关闭,则应用程序将无法启动;许多用户受到此更改的影响;鉴于 Kafka 是一个高可用性平台,我们没有预料到在没有活动代理的情况下启动应用程序会是一个常见的用例。missingTopicsFatalspring-doc.cn

生产者和消费者工厂更改

现在可以将其配置为为每个线程创建一个 producer。 您还可以在构造函数中提供实例作为已配置类(需要 no-arg 构造函数)或使用实例进行构造的替代,然后在所有 Producer 之间共享。 有关更多信息,请参见使用 DefaultKafkaProducerFactoryDefaultKafkaProducerFactorySupplier<Serializer>Serializerspring-doc.cn

相同的选项也可用于 中的实例。 有关更多信息,请参阅使用 KafkaMessageListenerContainerSupplier<Deserializer>DefaultKafkaConsumerFactoryspring-doc.cn

侦听器容器更改

以前,当使用侦听器适配器(例如 s)调用侦听器时,会收到错误处理程序(实际的侦听器异常为 )。 本机 s 引发的异常原封不动地传递给错误处理程序。 现在 a 始终是参数(实际的侦听器异常为 the ),它提供对容器属性的访问。ListenerExecutionFailedExceptioncause@KafkaListenerGenericMessageListenerListenerExecutionFailedExceptioncausegroup.idspring-doc.cn

因为侦听器容器有自己的提交偏移量的机制,所以它更喜欢 Kafka 是。 它现在会自动将其设置为 false,除非在 Consumer Factory 中专门设置或容器的 consumer 属性覆盖。ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIGfalsespring-doc.cn

该属性现在是默认的。ackOnErrorfalsespring-doc.cn

现在可以在 listener 方法中获取 consumer 的属性。 有关更多信息,请参阅获取使用者 group.idgroup.idspring-doc.cn

该容器具有一个新属性,允许在调用侦听器之前检查或修改记录。 如果需要调用多个拦截器,还提供了 A。 有关更多信息,请参阅 Message Listener ContainersrecordInterceptorCompositeRecordInterceptorspring-doc.cn

具有新方法,允许您相对于开始、结束或当前位置执行查找,并查找大于或等于时间戳的第一个偏移量。 有关更多信息,请参见 [seek]。ConsumerSeekAwarespring-doc.cn

现在提供了一个便利类来简化查找。 有关更多信息,请参见 [seek]。AbstractConsumerSeekAwarespring-doc.cn

它提供了一个选项,让侦听器容器中的 main 循环在调用之间休眠。 有关更多信息,请参阅其 JavaDocs 和使用 KafkaMessageListenerContainerContainerPropertiesidleBetweenPollsKafkaConsumer.poll()spring-doc.cn

使用 (或 ) 时,您现在可以通过调用 . 有关更多信息,请参阅提交偏移量AckMode.MANUALMANUAL_IMMEDIATEnackAcknowledgmentspring-doc.cn

现在可以使用 Micrometer s 监控侦听器性能。 有关更多信息,请参阅监控Timerspring-doc.cn

容器现在发布与启动相关的其他使用者生命周期事件。 有关更多信息,请参阅应用程序事件spring-doc.cn

事务性批处理侦听器现在可以支持僵尸屏蔽。 有关更多信息,请参阅 事务spring-doc.cn

现在可以使用 配置侦听器容器工厂,以便在创建和配置每个容器后进一步配置每个容器。 有关更多信息,请参阅 Container factoryContainerCustomizerspring-doc.cn

ErrorHandler 更改

现在,将某些异常视为致命异常,并禁用这些异常的重试,并在第一次失败时调用 recoverer。SeekToCurrentErrorHandlerspring-doc.cn

现在可以将 and 配置为在两次传递尝试之间应用 (线程休眠)。SeekToCurrentErrorHandlerSeekToCurrentBatchErrorHandlerBackOffspring-doc.cn

从版本 2.3.2 开始,当错误处理程序在恢复失败的记录后返回时,将提交已恢复记录的偏移量。spring-doc.cn

现在,当与 结合使用时,将发送到死信主题的消息的有效负载设置为无法反序列化的原始值。 以前,用户代码需要从消息标头中提取 。 有关更多信息,请参阅发布死信记录DeadLetterPublishingRecovererErrorHandlingDeserializernullDeserializationExceptionspring-doc.cn

主题生成器

提供了一个新类,以便更方便地创建用于自动 topic 配置的 s。 有关更多信息,请参见 [configuring-topics]。TopicBuilderNewTopic@Beanspring-doc.cn

Kafka Streams 更改

现在,您可以对 创建的 执行其他配置。 有关更多信息,请参阅 Streams ConfigurationStreamsBuilderFactoryBean@EnableKafkaStreamsspring-doc.cn

现在提供了 A,它允许恢复具有反序列化错误的记录。 它可以与 a 结合使用,以将这些记录发送到死信主题。 有关更多信息,请参见Recovery from Deserialization ExceptionsRecoveringDeserializationExceptionHandlerDeadLetterPublishingRecovererspring-doc.cn

已经提供了转换器,使用 SPEL 生成 Headers 值。 有关更多信息,请参阅 Header EnricherHeaderEnricherspring-doc.cn

已提供。 这允许 Kafka 流拓扑与 spring-messaging 组件(例如 Spring Integration 流)进行交互。 有关更多信息,请参见MessagingProcessor[从 KStream 调用 Spring 集成流]。MessagingTransformerspring-doc.cn

JSON 组件更改

现在,默认情况下,所有 JSON 感知组件都使用 . 现在提供基于 -的构造函数,以便更好地处理目标通用容器类型。 此外,还引入了 a 用于序列化为纯字符串。 有关更多信息,请参阅其 JavaDocs 和 Serialization, Deserialization, and Message ConversionObjectMapperJacksonUtils.enhancedObjectMapper()JsonDeserializerTypeReferenceJacksonMimeTypeModuleorg.springframework.util.MimeTypespring-doc.cn

A 以及所有 Json 转换器的新超类 . 此外,a 现在可用;它可以序列化 , 和 s 中的值。 有关更多信息,请参见Spring Messaging Message ConversionByteArrayJsonMessageConverterJsonMessageConverterStringOrBytesSerializerbyte[]BytesStringProducerRecordspring-doc.cn

和 现在具有 Fluent API,使编程配置更简单。 请参阅 javadocs、 序列化、反序列化和消息转换 以及 Streams JSON 序列化和反序列化 以获取更多信息。JsonSerializerJsonDeserializerJsonSerdespring-doc.cn

回复KafkaTemplate

当回复超时时,future 以 a 而不是 例外方式完成。KafkaReplyTimeoutExceptionKafkaExceptionspring-doc.cn

此外,现在还提供了一个重载方法,允许基于每条消息指定回复超时。sendAndReceivespring-doc.cn

AggregatingReplyingKafka模板

通过聚合来自多个接收者的回复来扩展 。 有关更多信息,请参阅 聚合多个回复ReplyingKafkaTemplatespring-doc.cn

交易变更

现在,您可以覆盖 和 上的生产者工厂。 有关更多信息,请参阅 transactionIdPrefixtransactionIdPrefixKafkaTemplateKafkaTransactionManagerspring-doc.cn

新的 Delegating Serializer/Deserializer

该框架现在提供了一个委托 和 ,利用标头来生成和使用具有多个键/值类型的记录。 有关更多信息,请参见Delegating Serializer 和 DeserializerSerializerDeserializerspring-doc.cn

新的 Retrying Deserializer

框架现在提供了一个 delegating ,以便在可能发生暂时性错误(如网络问题)时重试序列化。 有关更多信息,请参阅 Retrying DeserializerRetryingDeserializerspring-doc.cn

2.1 和 2.2 之间的更改

Kafka 客户端版本

此版本需要 2.0.0 或更高版本。kafka-clientsspring-doc.cn

类和软件包更改

类已从 移动到 。ContainerPropertiesorg.springframework.kafka.listener.configorg.springframework.kafka.listenerspring-doc.cn

枚举已从 移动到 。AckModeAbstractMessageListenerContainerContainerPropertiesspring-doc.cn

的 and 方法已从 和 移动到 和 。setBatchErrorHandler()setErrorHandler()ContainerPropertiesAbstractMessageListenerContainerAbstractKafkaListenerContainerFactoryspring-doc.cn

回滚处理后

提供了新策略。 有关更多信息,请参阅 After-rollback ProcessorAfterRollbackProcessorspring-doc.cn

ConcurrentKafkaListenerContainerFactory变化

现在,您可以使用 创建和配置 any ,而不仅仅是 Comments 的 Comments。 有关更多信息,请参阅 Container factoryConcurrentKafkaListenerContainerFactoryConcurrentMessageListenerContainer@KafkaListenerspring-doc.cn

侦听器容器更改

添加了新的容器属性 ()。 有关更多信息,请参阅使用 KafkaMessageListenerContainermissingTopicsFatalspring-doc.cn

现在,当使用者停止时,会发出 A。 有关更多信息,请参阅 线程安全ConsumerStoppedEventspring-doc.cn

Batch 侦听器可以选择接收 complete 对象,而不是 . 有关更多信息,请参阅 [batch-listeners]。ConsumerRecords<?, ?>List<ConsumerRecord<?, ?>spring-doc.cn

和 现在可以恢复(跳过)不断失败的记录,默认情况下,在 10 次失败后恢复。 它们可以配置为将失败的记录发布到死信主题。DefaultAfterRollbackProcessorSeekToCurrentErrorHandlerspring-doc.cn

从版本 2.2.4 开始,可以在选择死信主题名称时使用使用者的组 ID。spring-doc.cn

已添加。 有关更多信息,请参阅应用程序事件ConsumerStoppingEventspring-doc.cn

现在可以配置为在容器配置时提交已恢复记录的偏移量(自 2.2.4 起)。SeekToCurrentErrorHandlerAckMode.MANUAL_IMMEDIATEspring-doc.cn

@KafkaListener 更改

现在,您可以通过在注解上设置属性来覆盖侦听器容器工厂的 and 属性。 现在,您可以添加配置以确定将哪些标头(如果有)复制到回复消息中。 有关更多信息,请参阅 @KafkaListener AnnotationconcurrencyautoStartupspring-doc.cn

现在,您可以将其用作自己的注释上的元注释。 有关更多信息,请参阅@KafkaListener作为元注释@KafkaListenerspring-doc.cn

现在,可以更轻松地配置 a 进行验证。 有关更多信息请参阅 @KafkaListener @Payload 验证Validator@Payloadspring-doc.cn

现在,您可以直接在 Comments 上指定 kafka 使用者属性;这些将覆盖 Consumer Factory 中定义的任何具有相同名称的属性(自版本 2.2.4 起)。 有关更多信息,请参阅 Annotation 属性spring-doc.cn

标头映射更改

类型的标头现在映射为值中的简单字符串。 以前,它们被映射为 JSON,并且仅被解码。 无法解码。 它们现在是用于互操作性的简单字符串。MimeTypeMediaTypeRecordHeaderMimeTypeMediaTypespring-doc.cn

此外,它还具有一种新方法,允许指定应使用 JSON 而不是 JSON 进行映射的类型。 有关更多信息,请参阅 Message HeadersDefaultKafkaHeaderMapperaddToStringClassestoString()spring-doc.cn

嵌入式 Kafka 更改

该类及其接口已被弃用,取而代之的是 及其 JUnit 4 包装器。 注释现在填充一个 bean,而不是已弃用的 . 此更改允许在 JUnit 5 测试中使用 。 现在,注释具有用于指定填充 . 有关更多信息,请参阅测试应用程序KafkaEmbeddedKafkaRuleEmbeddedKafkaBrokerEmbeddedKafkaRule@EmbeddedKafkaEmbeddedKafkaBrokerKafkaEmbedded@EmbeddedKafka@EmbeddedKafkaportsEmbeddedKafkaBrokerspring-doc.cn

JsonSerializer/Deserializer 增强功能

现在,您可以使用 producer 和 consumer 属性提供类型映射信息。spring-doc.cn

反序列化器上提供了新的构造函数,以允许使用提供的目标类型覆盖类型 Headers 信息。spring-doc.cn

现在,默认情况下会删除任何类型的信息标头。JsonDeserializerspring-doc.cn

您现在可以使用 Kafka 属性(自 2.2.3 起)将 配置为忽略类型信息标头。JsonDeserializerspring-doc.cn

有关更多信息,请参见 序列化、反序列化和消息转换spring-doc.cn

Kafka Streams 更改

流配置 Bean 现在必须是对象而不是对象。KafkaStreamsConfigurationStreamsConfigspring-doc.cn

已从 package 移动到 。StreamsBuilderFactoryBean…​core…​configspring-doc.cn

引入 是为了在实例上构建条件分支时获得更好的最终用户体验。KafkaStreamBrancherKStreamspring-doc.cn

有关更多信息,请参阅 Apache Kafka Streams 支持配置spring-doc.cn

事务 ID

当侦听器容器启动事务时,现在 会附加 . 此更改允许对僵尸进行适当的围栏,如此处所述transactional.idtransactionIdPrefix<group.id>.<topic>.<partition>spring-doc.cn

2.0 和 2.1 之间的变化

Kafka 客户端版本

此版本需要 1.0.0 或更高版本。kafka-clientsspring-doc.cn

版本 2.2 原生支持 1.1.x 客户端。spring-doc.cn

JSON 改进

现在在 中添加类型信息,让转换器在接收时根据消息本身而不是固定的配置类型创建特定类型。 有关更多信息,请参见 序列化、反序列化和消息转换StringJsonMessageConverterJsonSerializerHeadersJsonDeserializerspring-doc.cn

容器停止错误处理程序

现在为记录和批处理侦听器提供了容器错误处理程序,这些侦听器将侦听器抛出的任何异常视为 fatal/ 他们停止了容器。 有关更多信息,请参阅处理异常spring-doc.cn

暂停和恢复容器

侦听器容器现在具有 and 方法(自版本 2.1.3 起)。 有关更多信息,请参阅暂停和恢复侦听器容器pause()resume()spring-doc.cn

状态重试

从版本 2.1.3 开始,您可以配置有状态重试。 有关更多信息,请参阅有状态重试spring-doc.cn

客户端 ID

从版本 2.1.1 开始,您现在可以在 上设置前缀。 以前,要自定义客户端 ID,您需要为每个侦听器提供单独的使用者工厂(和容器工厂)。 前缀以 为后缀,以便在使用并发时提供唯一的客户端 ID。client.id@KafkaListener-nspring-doc.cn

日志记录偏移提交

默认情况下,主题偏移提交的日志记录使用日志记录级别执行。 从版本 2.1.2 开始,called 中的新属性允许您指定这些消息的日志级别。 有关更多信息,请参阅使用 KafkaMessageListenerContainerDEBUGContainerPropertiescommitLogLevelspring-doc.cn

默认@KafkaHandler

从版本 2.1.3 开始,您可以将类级别的注释之一指定为默认注释。 有关更多信息,请参阅 @KafkaListener on a Class@KafkaHandler@KafkaListenerspring-doc.cn

回复KafkaTemplate

从版本 2.1.3 开始,提供了子类 of 以支持请求/回复语义。 有关更多信息,请参阅使用 ReplyingKafkaTemplateKafkaTemplatespring-doc.cn

ChainedKafkaTransactionManager

版本 2.1.3 引入了 . (现已弃用)。ChainedKafkaTransactionManagerspring-doc.cn

2.0 版的迁移指南

请参阅 2.0 到 2.1 迁移指南。spring-doc.cn

1.3 和 2.0 之间的变化

Spring Framework 和 Java 版本

Spring for Apache Kafka 项目现在需要 Spring Framework 5.0 和 Java 8。spring-doc.cn

@KafkaListener变化

现在,您可以使用 . 如果该方法返回结果,则会将其转发到指定的 Topic。 有关更多信息,请参阅使用 @SendTo 转发侦听器结果@KafkaListener@KafkaHandler@SendTospring-doc.cn

消息侦听器

消息侦听器现在可以知道该对象。 有关更多信息,请参阅 [message-listeners]。Consumerspring-doc.cn

ConsumerAwareRebalanceListener

再平衡侦听器现在可以在再平衡通知期间访问对象。 有关更多信息,请参阅重新平衡侦听器Consumerspring-doc.cn

1.2 和 1.3 之间的变化

对事务的支持

0.11.0.0 客户端库添加了对事务的支持。 添加了对事务的 和其他支持。 有关更多信息,请参阅 事务KafkaTransactionManagerspring-doc.cn

对标头的支持

0.11.0.0 客户端库添加了对消息标头的支持。 现在可以将这些 映射到 . 或从 映射。 有关更多信息,请参阅 Message Headersspring-messagingMessageHeadersspring-doc.cn

创建主题

0.11.0.0 客户端库提供了一个 ,您可以使用它来创建主题。 使用此客户端自动添加定义为实例的主题。AdminClientKafkaAdmin@Beanspring-doc.cn

支持 Kafka 时间戳

KafkaTemplate现在支持 API 添加带有时间戳的记录。 引入了有关支持的新内容。 此外,还添加了新的和测试实用程序。 有关更多详细信息,请参阅使用 KafkaTemplate@KafkaListener Annotation 和测试应用程序KafkaHeaderstimestampKafkaConditions.timestamp()KafkaMatchers.hasTimestamp()spring-doc.cn

@KafkaListener变化

您现在可以配置 来处理异常。 有关更多信息,请参阅处理异常KafkaListenerErrorHandlerspring-doc.cn

默认情况下,该属性现在用作属性,覆盖在 Consumer Factory 中配置的属性(如果存在)。 此外,您可以显式配置 on 注解。 以前,您需要一个单独的容器工厂(和 Consumer Factory)来为侦听器使用不同的值。 要恢复以前使用出厂配置的行为,请将注释上的属性设置为 。@KafkaListeneridgroup.idgroupIdgroup.idgroup.ididIsGroupfalsespring-doc.cn

@EmbeddedKafka注解

为方便起见,提供了一个测试类级 Comments,用于注册为 Bean。 有关更多信息,请参阅测试应用程序@EmbeddedKafkaKafkaEmbeddedspring-doc.cn

Kerberos 配置

现在提供对配置 Kerberos 的支持。 有关更多信息,请参阅 JAAS 和 Kerberosspring-doc.cn

1.1 和 1.2 之间的更改

此版本使用 0.10.2.x 客户端。spring-doc.cn

1.0 和 1.1 之间的更改

Kafka 客户端

该版本使用 Apache Kafka 0.10.x.x 客户端。spring-doc.cn

Batch 侦听器

侦听器可以配置为接收操作返回的整批消息,而不是一次接收一条消息。consumer.poll()spring-doc.cn

Null 负载

Null 负载用于在使用日志压缩时“删除”键。spring-doc.cn

初始偏移

在显式分配分区时,您现在可以配置相对于使用者组当前位置的初始偏移量,而不是绝对偏移量或相对于当前端的偏移量。spring-doc.cn

寻求

您现在可以查找每个主题或分区的位置。 当使用组管理并且 Kafka 分配分区时,您可以使用此选项在初始化期间设置初始位置。 您还可以在检测到空闲容器时或在应用程序执行中的任何任意点进行查找。 有关更多信息,请参见 [seek]。spring-doc.cn