此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.2.1Spring中文文档

此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.2.1Spring中文文档

该框架提供了一些使用 DLT 的策略。 您可以提供 DLT 处理方法,使用默认日志记录方法,或者根本没有 DLT。 此外,您还可以选择如果 DLT 处理失败时会发生什么。Spring中文文档

DLT处理方法

您可以指定用于处理主题的 DLT 的方法,以及处理失败时的行为。Spring中文文档

为此,您可以在具有注释的类的方法中使用注释。 请注意,该类中的所有带批注的方法都将使用相同的方法。@DltHandler@RetryableTopic@RetryableTopicSpring中文文档

@RetryableTopic
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}

@DltHandler
public void processMessage(MyPojo message) {
    // ... message processing, persistence, etc
}

DLT 处理程序方法也可以通过该方法提供,将应处理 DLT 消息的 Bean 名称和方法名称作为参数传递。RetryTopicConfigurationBuilder.dltHandlerMethod(String, String)Spring中文文档

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .dltHandlerMethod("myCustomDltProcessor", "processDltMessage")
            .create(template);
}

@Component
public class MyCustomDltProcessor {

    private final MyDependency myDependency;

    public MyCustomDltProcessor(MyDependency myDependency) {
        this.myDependency = myDependency;
    }

    public void processDltMessage(MyPojo message) {
        // ... message processing, persistence, etc
    }
}
如果未提供 DLT 处理程序,则使用默认值。RetryTopicConfigurer.LoggingDltListenerHandlerMethod

从版本 2.8 开始,如果根本不想从此应用程序中的 DLT 使用,包括通过默认处理程序(或者你希望延迟使用),则可以控制 DLT 容器是否启动,而与容器工厂的属性无关。autoStartupSpring中文文档

使用批注时,将属性设置为 ;使用配置生成器时,请使用 .@RetryableTopicautoStartDltHandlerfalseautoStartDltHandler(false)Spring中文文档

稍后可以通过 .KafkaListenerEndpointRegistrySpring中文文档

如果未提供 DLT 处理程序,则使用默认值。RetryTopicConfigurer.LoggingDltListenerHandlerMethod

DLT 失败行为

如果 DLT 处理失败,则有两种可能的行为可用:和 。ALWAYS_RETRY_ON_ERRORFAIL_ON_ERRORSpring中文文档

在前者中,记录被转发回 DLT 主题,因此它不会阻止其他 DLT 记录的处理。 在后者中,使用者在不转发消息的情况下结束执行。Spring中文文档

@RetryableTopic(dltProcessingFailureStrategy =
            DltStrategy.FAIL_ON_ERROR)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .dltHandlerMethod("myCustomDltProcessor", "processDltMessage")
            .doNotRetryOnDltFailure()
            .create(template);
}
默认行为为 。ALWAYS_RETRY_ON_ERROR
从版本 2.8.3 开始,如果记录导致引发致命异常,则不会将记录路由回 DLT, 例如 ,因为通常总是会抛出此类异常。ALWAYS_RETRY_ON_ERRORDeserializationException

被视为致命的例外情况包括:Spring中文文档

您可以使用 Bean 上的方法向此列表添加异常和删除异常。DestinationTopicResolverSpring中文文档

有关详细信息,请参阅异常分类器Spring中文文档

默认行为为 。ALWAYS_RETRY_ON_ERROR
从版本 2.8.3 开始,如果记录导致引发致命异常,则不会将记录路由回 DLT, 例如 ,因为通常总是会抛出此类异常。ALWAYS_RETRY_ON_ERRORDeserializationException

配置无 DLT

该框架还提供了不为主题配置 DLT 的可能性。 在这种情况下,在用尽重审后,处理就结束了。Spring中文文档

@RetryableTopic(dltProcessingFailureStrategy =
            DltStrategy.NO_DLT)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .doNotConfigureDlt()
            .create(template);
}