从版本 2.9 开始,对于默认配置,注释应在带注释的类中使用。 这使该功能能够正确引导,并允许访问注入要在运行时查找的某些功能组件。@EnableKafkaRetryTopic@ConfigurationSpring中文文档

如果添加此注解,则无需添加 ,因为 是用 元注释的。@EnableKafka@EnableKafkaRetryTopic@EnableKafka

此外,从该版本开始,对于功能组件和全局功能的更高级配置,应在类中扩展该类,并重写相应的方法。 有关详细信息,请参阅配置全局设置和功能RetryTopicConfigurationSupport@ConfigurationSpring中文文档

默认情况下,重试主题的容器将具有与主容器相同的并发性。 从版本 3.0 开始,您可以为重试容器设置不同的容器(在注释上或在 中)。concurrencyRetryConfigurationBuilderSpring中文文档

只能使用上述技术中的一种,并且只能使用一个类。@ConfigurationRetryTopicConfigurationSupport
如果添加此注解,则无需添加 ,因为 是用 元注释的。@EnableKafka@EnableKafkaRetryTopic@EnableKafka
只能使用上述技术中的一种,并且只能使用一个类。@ConfigurationRetryTopicConfigurationSupport

使用注释@RetryableTopic

要为带注释的方法配置重试主题和 DLT,您只需向其添加注释,Spring for Apache Kafka 将使用默认配置引导所有必要的主题和使用者。@KafkaListener@RetryableTopicSpring中文文档

@RetryableTopic(kafkaTemplate = "myRetryableTopicKafkaTemplate")
@KafkaListener(topics = "my-annotated-topic", groupId = "myGroupId")
public void processMessage(MyPojo message) {
    // ... message processing
}

从 3.2 开始,对类@KafkaListener的支持将是:@RetryableTopicSpring中文文档

@RetryableTopic(listenerContainerFactory = "my-retry-topic-factory")
@KafkaListener(topics = "my-annotated-topic")
public class ClassLevelRetryListener {

    @KafkaHandler
    public void processMessage(MyPojo message) {
        // ... message processing
    }

}

您可以在同一类中指定一个方法,通过使用注释对 dlt 消息进行注释来处理该方法。 如果未提供 DltHandler 方法,则会创建一个默认使用者,该使用者仅记录使用量。@DltHandlerSpring中文文档

@DltHandler
public void processMessage(MyPojo message) {
    // ... message processing, persistence, etc
}
如果未指定 kafkaTemplate 名称,那么将查找具有 name 的 bean。 如果未找到 Bean,则会引发异常。defaultRetryTopicKafkaTemplate

从 3.0 版开始,注释可以用作自定义注释的元注释;例如:@RetryableTopicSpring中文文档

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@RetryableTopic
static @interface MetaAnnotatedRetryableTopic {

    @AliasFor(attribute = "concurrency", annotation = RetryableTopic.class)
    String parallelism() default "3";

}
如果未指定 kafkaTemplate 名称,那么将查找具有 name 的 bean。 如果未找到 Bean,则会引发异常。defaultRetryTopicKafkaTemplate

使用 beanRetryTopicConfiguration

您还可以通过在带注释的类中创建 Bean 来配置非阻塞重试支持。RetryTopicConfiguration@ConfigurationSpring中文文档

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, Object> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .create(template);
}

这将为使用默认配置批注的方法中的所有主题创建重试主题和 dlt 以及相应的使用者。消息转发需要实例。@KafkaListenerKafkaTemplateSpring中文文档

为了更精细地控制如何处理每个主题的非阻塞重审,可以提供多个 Bean。RetryTopicConfigurationSpring中文文档

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackOff(3000)
            .maxAttempts(5)
            .concurrency(1)
            .includeTopics("my-topic", "my-other-topic")
            .create(template);
}

@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .exponentialBackoff(1000, 2, 5000)
            .maxAttempts(4)
            .excludeTopics("my-topic", "my-other-topic")
            .retryOn(MyException.class)
            .create(template);
}
重试主题的使用者和 dlt 的使用者将被分配到一个使用者组,其组 ID 是您在批注参数中提供的组 ID 与主题后缀的组合。 如果不提供任何内容,它们将都属于同一组,并且重试主题的重新平衡将导致主主题上不必要的重新平衡。groupId@KafkaListener
如果使用者配置了 ErrorHandlingDeserializer,则要处理反序列化异常,请务必使用序列化程序配置 and its producer,该序列化程序可以处理由反序列化异常导致的普通对象和原始值。 模板的泛型值类型应为 。 一种技术是使用 ;示例如下:KafkaTemplatebyte[]ObjectDelegatingByTypeSerializer
@Bean
public ProducerFactory<String, Object> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
        new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
               MyNormalObject.class, new JsonSerializer<Object>())));
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}
可以对同一主题使用多个注释,无论是否进行手动分区分配以及非阻塞重试,但给定主题只能使用一种配置。 最好使用单个 Bean 来配置此类主题;如果对同一主题使用多个批注,则所有批注都应具有相同的值,否则其中一个批注将应用于该主题的所有侦听器,而其他批注的值将被忽略。@KafkaListenerRetryTopicConfiguration@RetryableTopic
重试主题的使用者和 dlt 的使用者将被分配到一个使用者组,其组 ID 是您在批注参数中提供的组 ID 与主题后缀的组合。 如果不提供任何内容,它们将都属于同一组,并且重试主题的重新平衡将导致主主题上不必要的重新平衡。groupId@KafkaListener
如果使用者配置了 ErrorHandlingDeserializer,则要处理反序列化异常,请务必使用序列化程序配置 and its producer,该序列化程序可以处理由反序列化异常导致的普通对象和原始值。 模板的泛型值类型应为 。 一种技术是使用 ;示例如下:KafkaTemplatebyte[]ObjectDelegatingByTypeSerializer
可以对同一主题使用多个注释,无论是否进行手动分区分配以及非阻塞重试,但给定主题只能使用一种配置。 最好使用单个 Bean 来配置此类主题;如果对同一主题使用多个批注,则所有批注都应具有相同的值,否则其中一个批注将应用于该主题的所有侦听器,而其他批注的值将被忽略。@KafkaListenerRetryTopicConfiguration@RetryableTopic

配置全局设置和功能

从 2.9 开始,删除了以前用于配置组件的 Bean 覆盖方法(由于 API 的上述实验性质,没有弃用)。 这不会改变 Bean 方法,只会改变基础架构组件的配置。 现在,该类应该在(单个)类中扩展,并覆盖正确的方法。 示例如下:RetryTopicConfigurationRetryTopicConfigurationSupport@ConfigurationSpring中文文档

@EnableKafka
@Configuration
public class MyRetryTopicConfiguration extends RetryTopicConfigurationSupport {

    @Override
    protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
        blockingRetries
                .retryOn(MyBlockingRetriesException.class, MyOtherBlockingRetriesException.class)
                .backOff(new FixedBackOff(3000, 3));
    }

    @Override
    protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
        nonBlockingFatalExceptions.add(MyNonBlockingException.class);
    }

    @Override
    protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
        // Use the new 2.9 mechanism to avoid re-fetching the same records after a pause
        customizersConfigurer.customizeErrorHandler(eh -> {
            eh.setSeekAfterError(false);
        });
    }

}
使用此配置方法时,不应使用注释来防止上下文因重复的 Bean 而无法启动。 请改用简单批注。@EnableKafkaRetryTopic@EnableKafka

如果为 true,则将使用指定的分区数和复制因子创建主主题和重试主题。 从版本 3.0 开始,缺省复制因子是 ,这意味着使用代理缺省值。 如果您的代理版本低于 2.4,则需要设置显式值。 要覆盖特定主题(例如主主题或 DLT)的这些值,只需添加具有所需属性的 a;这将覆盖自动创建属性。autoCreateTopics-1NewTopic@BeanSpring中文文档

默认情况下,使用接收记录的原始分区将记录发布到重试主题。 如果重试主题的分区少于主主题,则应适当配置框架;下面是一个示例。
@EnableKafka
@Configuration
public class Config extends RetryTopicConfigurationSupport {

    @Override
    protected Consumer<DeadLetterPublishingRecovererFactory> configureDeadLetterPublishingContainerFactory() {
        return dlprf -> dlprf.setPartitionResolver((cr, nextTopic) -> null);
    }

    ...

}

该函数的参数是使用者记录和下一个主题的名称。 可以返回特定的分区号,也可以指示应确定分区。nullKafkaProducerSpring中文文档

默认情况下,当记录在重试主题之间转换时,将保留重试标头的所有值(尝试次数、时间戳)。 从版本 2.9.6 开始,如果只想保留这些标头的最后一个值,请使用上面显示的方法将工厂的属性设置为 。configureDeadLetterPublishingContainerFactory()retainAllRetryHeaderValuesfalseSpring中文文档

使用此配置方法时,不应使用注释来防止上下文因重复的 Bean 而无法启动。 请改用简单批注。@EnableKafkaRetryTopic@EnableKafka
默认情况下,使用接收记录的原始分区将记录发布到重试主题。 如果重试主题的分区少于主主题,则应适当配置框架;下面是一个示例。

查找 RetryTopicConfiguration

尝试通过从注释或 Bean 容器(如果没有可用的注释)创建一个实例来提供实例。RetryTopicConfiguration@RetryableTopicSpring中文文档

如果在容器中找到 bean,则会进行检查,以确定是否应由任何此类实例处理提供的主题。Spring中文文档

如果提供了注释,则查找注释方法。@RetryableTopicDltHandlerSpring中文文档

从 3.2 开始,在类上注释时提供新的 API 来创建:RetryTopicConfiguration@RetryableTopicSpring中文文档

@Bean
public RetryTopicConfiguration myRetryTopic() {
    RetryTopicConfigurationProvider provider = new RetryTopicConfigurationProvider(beanFactory);
    return provider.findRetryConfigurationFor(topics, null, AnnotatedClass.class, bean);
}

@RetryableTopic
public static class AnnotatedClass {
    // NoOps
}