此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.0! |
配置
从版本 2.9 开始,对于默认配置,应在带 Comments 的类中使用 Comments。
这使该功能能够正确引导,并允许注入一些功能的组件,以便在运行时进行查找。@EnableKafkaRetryTopic
@Configuration
如果添加此注释,则无需同时添加 ,因为 是元注释的 。@EnableKafka @EnableKafkaRetryTopic @EnableKafka |
此外,从该版本开始,对于功能组件和全局功能的更高级配置,应在类中扩展该类,并覆盖适当的方法。
有关更多详细信息,请参阅配置全局设置和功能。RetryTopicConfigurationSupport
@Configuration
默认情况下,重试主题的容器将具有与主容器相同的并发性。
从版本 3.0 开始,您可以为重试容器设置不同的容器(在注释上或在 中)。concurrency
RetryConfigurationBuilder
只能使用上述技术中的一种,并且只有一个类可以扩展 。@Configuration RetryTopicConfigurationSupport |
使用注释@RetryableTopic
要为带注释的方法配置重试主题和 dlt,您只需向其添加注释,Spring for Apache Kafka 将使用默认配置引导所有必要的主题和使用者。@KafkaListener
@RetryableTopic
@RetryableTopic(kafkaTemplate = "myRetryableTopicKafkaTemplate")
@KafkaListener(topics = "my-annotated-topic", groupId = "myGroupId")
public void processMessage(MyPojo message) {
// ... message processing
}
从 3.2 开始,对类@KafkaListener的支持将是:@RetryableTopic
@RetryableTopic(listenerContainerFactory = "my-retry-topic-factory")
@KafkaListener(topics = "my-annotated-topic")
public class ClassLevelRetryListener {
@KafkaHandler
public void processMessage(MyPojo message) {
// ... message processing
}
}
您可以在同一类中指定一个方法来处理 dlt 消息,方法是使用注释对其进行注释。
如果未提供 DltHandler 方法,则会创建一个默认使用者,该使用者仅记录消耗。@DltHandler
@DltHandler
public void processMessage(MyPojo message) {
// ... message processing, persistence, etc
}
如果未指定kafkaTemplate名称,则将查找具有name的 bean。
如果未找到 bean,则会引发异常。defaultRetryTopicKafkaTemplate |
从版本 3.0 开始,注释可以用作自定义注释的元注释;例如:@RetryableTopic
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@RetryableTopic
static @interface MetaAnnotatedRetryableTopic {
@AliasFor(attribute = "concurrency", annotation = RetryableTopic.class)
String parallelism() default "3";
}
使用 beanRetryTopicConfiguration
您还可以通过在带注释的类中创建 bean 来配置非阻塞重试支持。RetryTopicConfiguration
@Configuration
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, Object> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.create(template);
}
这将为使用默认配置批注的方法中的所有主题创建重试主题和 dlt,以及相应的使用者。该实例是消息转发所必需的。@KafkaListener
KafkaTemplate
为了对如何处理每个主题的非阻塞重试进行更精细的控制,可以提供多个 bean。RetryTopicConfiguration
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackOff(3000)
.maxAttempts(5)
.concurrency(1)
.includeTopics("my-topic", "my-other-topic")
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.exponentialBackoff(1000, 2, 5000)
.maxAttempts(4)
.excludeTopics("my-topic", "my-other-topic")
.retryOn(MyException.class)
.create(template);
}
重试主题和 dlt 的使用者将被分配到一个使用者组,该使用者组的组 ID 是您在注释参数中提供的 ID 与主题后缀的组合。
如果您不提供任何 URL,它们都将属于同一个组,并且对重试主题进行再平衡将导致对主主题进行不必要的再平衡。groupId @KafkaListener |
如果消费者配置了 ErrorHandlingDeserializer ,为了处理反序列化异常,重要的是使用 序列化程序 和 producer 配置一个序列化程序,该序列化程序可以处理由反序列化异常导致的普通对象和原始值。
模板的 generic value type 应为 。
一种方法是使用 ;示例如下:KafkaTemplate byte[] Object DelegatingByTypeSerializer |
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
MyNormalObject.class, new JsonSerializer<Object>())));
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
多个 annotation 可以用于同一主题,无论是否手动分配分区以及非阻塞重试,但只有一个配置将用于给定主题。
最好使用单个 bean 来配置此类主题;如果多个 Comments 用于同一个主题,则所有 Comments 都应该具有相同的值,否则其中一个 Comments 将应用于该主题的所有侦听器,而其他 Annotation 的值将被忽略。@KafkaListener RetryTopicConfiguration @RetryableTopic |
配置全局设置和功能
从 2.9 开始,之前用于配置组件的 bean 覆盖方法已被删除(由于上述 API 的实验性质,没有弃用)。
这不会改变 bean 的方法 - 只改变基础设施组件的配置。
现在,该类应该在一个(单个)类中扩展,并覆盖适当的方法。
示例如下:RetryTopicConfiguration
RetryTopicConfigurationSupport
@Configuration
@EnableKafka
@Configuration
public class MyRetryTopicConfiguration extends RetryTopicConfigurationSupport {
@Override
protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
blockingRetries
.retryOn(MyBlockingRetriesException.class, MyOtherBlockingRetriesException.class)
.backOff(new FixedBackOff(3000, 3));
}
@Override
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
nonBlockingFatalExceptions.add(MyNonBlockingException.class);
}
@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
// Use the new 2.9 mechanism to avoid re-fetching the same records after a pause
customizersConfigurer.customizeErrorHandler(eh -> {
eh.setSeekAfterError(false);
});
}
}
使用此配置方法时,不应使用 Comments 来防止上下文由于重复的 bean 而无法启动。
请改用 simple 注释。@EnableKafkaRetryTopic @EnableKafka |
当为 true 时,将使用指定数量的分区和复制因子创建 main 和 retry 主题。
从版本 3.0 开始,默认复制因子为 ,表示使用代理默认值。
如果您的代理版本低于 2.4,则需要设置一个显式值。
要覆盖特定主题(例如主主题或 DLT)的这些值,只需添加具有所需属性的 a;这将覆盖 Auto Creation 属性。autoCreateTopics
-1
NewTopic
@Bean
默认情况下,使用接收记录的原始分区将记录发布到重试主题。 如果重试主题的分区数少于主主题的分区数,则应适当配置框架;下面是一个示例。 |
@EnableKafka
@Configuration
public class Config extends RetryTopicConfigurationSupport {
@Override
protected Consumer<DeadLetterPublishingRecovererFactory> configureDeadLetterPublishingContainerFactory() {
return dlprf -> dlprf.setPartitionResolver((cr, nextTopic) -> null);
}
...
}
该函数的参数是使用者记录和下一个主题的名称。
您可以返回特定的分区编号,或指示 应确定分区。null
KafkaProducer
默认情况下,当记录通过重试主题转换时,将保留重试标头的所有值(尝试次数、时间戳)。
从版本 2.9.6 开始,如果您只想保留这些 Headers 的最后一个值,请使用上面显示的方法将工厂的属性设置为。configureDeadLetterPublishingContainerFactory()
retainAllRetryHeaderValues
false
查找 RetryTopicConfiguration
尝试通过从 Comments 或 Bean 容器(如果没有可用的 Comments )创建一个实例来提供 的实例。RetryTopicConfiguration
@RetryableTopic
如果在容器中找到 bean,则会进行检查以确定所提供的主题是否应由任何此类实例处理。
如果提供了 annotation ,则查找带 Comments 的方法。@RetryableTopic
DltHandler
从 3.2 开始,在类上注释时为 Create 提供新的 API:RetryTopicConfiguration
@RetryableTopic
@Bean
public RetryTopicConfiguration myRetryTopic() {
RetryTopicConfigurationProvider provider = new RetryTopicConfigurationProvider(beanFactory);
return provider.findRetryConfigurationFor(topics, null, AnnotatedClass.class, bean);
}
@RetryableTopic
public static class AnnotatedClass {
// NoOps
}