此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.0! |
新增功能
自 3.1 以来 3.2 中的新增功能
本节介绍从版本 3.1 到版本 3.2 所做的更改。 有关早期版本的更改,请参阅更改历史记录。
Kafka 客户端版本
此版本需要 3.7.0 。
3.7.0 版本的 Kafka 客户端引入了新的消费组协议。
有关更多详细信息及其限制,请参阅 KIP-848。
新的使用者组协议是早期访问版本,不打算在生产中使用。
在此版本中,仅建议用于测试目的。
因此, Spring for Apache Kafka 仅在其本身可用的此类测试级别支持的范围内支持这种新的消费者组协议。
默认情况下, Spring for Apache Kafka 使用经典的消费者组协议,并且在测试新的消费者组协议时,需要通过消费者上的属性选择加入。kafka-clients
kafka-client
group.protocol
测试支持更改
默认情况下,该模式处于禁用状态,想要使用该模式的用户必须启用它。
这是由于在使用 in 模式时观察到某些不稳定性,尤其是在测试新的使用者组协议时。
新的消费者组协议仅在 模式下受支持,因此,在测试新协议时,需要针对真实的 Kafka 集群进行,而不是基于的 Kafka 集群。
此外,在使用 in 模式运行多个方法时,还观察到一些其他争用条件。
在这些问题得到解决之前,默认值 on 将保持为 。kraft
EmbeddedKafka
kraft
EmbeddedKafka
kraft
kraft
KafkaClusterTestKit
EmbeddedKafka
KafkaListener
EmbeddedKafka
kraft
kraft
EmbeddedKafka
false
Kafka Streams 交互式查询支持
用于访问 Kafka Streams 交互式查询中使用的可查询存储的新 API。
有关更多详细信息,请参阅 Kafka Streams Interactive Support 。KafkaStreamsInteractiveQuerySupport
TransactionIdSuffixStrategy
引入了一个新接口来管理 suffix。
默认实现是当设置大于零可以在特定范围内重复使用时,否则将通过递增计数器来动态生成后缀。
有关更多信息,请参阅Fixed TransactionIdSuffix。TransactionIdSuffixStrategy
transactional.id
DefaultTransactionIdSuffixStrategy
maxCache
transactional.id
异步 @KafkaListener 返回
@KafkaListener
(和 ) 方法现在可以返回异步返回类型 include 和 Kotlin 函数。
有关更多信息,请参阅 Async Returns 。@KafkaHandler
CompletableFuture<?>
Mono<?>
suspend
根据引发的异常将消息路由到自定义 DLT
现在可以根据异常的类型将消息重定向到自定义 DLT,该异常在消息处理过程中引发。
重定向规则通过 或 设置。
自定义 DLT 以及其他重试和死信主题会自动创建。
有关更多信息,请参阅根据引发的异常将消息路由到自定义 DLT。RetryableTopic.exceptionBasedDltRouting
RetryTopicConfigurationBuilder.dltRoutingRules
弃用 ContainerProperties transactionManager 属性
弃用属性以支持 ,与一般类型相比,这是一种更窄的类型。请参阅 ContainerProperties 和事务同步。transactionManager
ContainerProperties
KafkaAwareTransactionManager
PlatformTransactionManager
回滚处理后
提供了新的 API。
有关更多信息,请参阅 After-rollback Processor 。AfterRollbackProcessor
processBatch
更改 SameIntervalTopicReuseStrategy 默认值@RetryableTopic
将属性默认值更改为 。
请参阅 maxInterval Exponential Delay 的单个主题。@RetryableTopic
SameIntervalTopicReuseStrategy
SINGLE_TOPIC
非阻塞重试支持类级别@KafkaListener
非阻塞重试支持对 Class 进行@KafkaListener。 请参阅 Non-blocking Retries。
支持 RetryTopicConfigurationProvider 中类的进程@RetryableTopic。
提供新的公共 API 以查找 。
请参阅 查找 RetryTopicConfigurationRetryTopicConfiguration
RetryTopicConfigurer 支持进程 MultiMethodKafkaListenerEndpoint。
支持流程和注册 .
为 properties 和 .
严格修改 that for types。
add new 构造函数为提供的 bean 构造一个实例。
提供用于重试端点的 handler 多方法的新类。RetryTopicConfigurer
MultiMethodKafkaListenerEndpoint
MultiMethodKafkaListenerEndpoint
getter/setter
defaultMethod
methods
EndpointCustomizer
MethodKafkaListenerEndpoint
EndpointHandlerMethod
EndpointHandlerMultiMethod
新的 API 方法,用于根据用户提供的函数寻找偏移量
ConsumerCallback
提供了一个新的 API,用于根据用户定义的函数查找偏移量,该函数将使用者中的当前偏移量作为参数。
有关更多详细信息,请参阅 Seek API Docs 。
@PartitionOffset SeekPosition 支持
添加属性以支持 .
有关更多详细信息,请参阅 manual-assignment 。seekPosition
@PartitionOffset
TopicPartitionOffset.SeekPosition
TopicPartitionOffset 中的新构造函数,它接受一个函数来计算要查找的偏移量
TopicPartitionOffset
具有一个新的构造函数,该构造函数采用用户提供的函数来计算要查找的偏移量。
使用此构造函数时,框架使用当前使用者偏移位置的 input 参数调用函数。
有关更多详细信息,请参阅 Seek API Docs 。
Spring Boot 应用程序名称作为默认客户端 ID 前缀
对于定义应用程序名称的 Spring Boot 应用程序,现在使用此名称 作为某些客户端类型的自动生成的客户端 ID 的默认前缀。 有关更多详细信息,请参阅默认客户端 ID 前缀。
增强了 MessageListenerContainers 的检索
ListenerContainerRegistry
提供了两个新的 API 动态查找和筛选实例。 按 ID 进行筛选,另一种是按 ID 和容器属性进行筛选。MessageListenerContainer
getListenerContainersMatching(Predicate<String> idMatcher)
getListenerContainersMatching(BiPredicate<String, MessageListenerContainer> matcher)
有关更多信息,请参阅 @KafkaListener
Lifecycle Management 的 API 文档。
通过提供更多跟踪标签来增强观察
KafkaTemplateObservation
提供更多的跟踪标签(低基数)。 提供了新的 API 来查找高基数 Key 名称和更多跟踪标签(高基数或低基数)。
参见千分尺观察KafkaListenerObservation