此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.2.4

此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.2.4

该框架提供了一些使用 DLT 的策略。 您可以提供 DLT 处理方法、使用默认日志记录方法或根本不使用 DLT。 此外,您还可以选择在 DLT 处理失败时会发生什么情况。

DLT 处理方法

您可以指定用于处理主题的 DLT 的方法,以及处理失败时的行为。

为此,您可以在带有 annotation 的类的方法中使用 annotation。 请注意,相同的方法将用于该类中的所有带 Comments 的方法。@DltHandler@RetryableTopic@RetryableTopic

@RetryableTopic
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}

@DltHandler
public void processMessage(MyPojo message) {
    // ... message processing, persistence, etc
}

DLT 处理程序方法也可以通过该方法提供,将应该处理 DLT 消息的 bean 名称和方法名称作为参数传递。RetryTopicConfigurationBuilder.dltHandlerMethod(String, String)

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .dltHandlerMethod("myCustomDltProcessor", "processDltMessage")
            .create(template);
}

@Component
public class MyCustomDltProcessor {

    private final MyDependency myDependency;

    public MyCustomDltProcessor(MyDependency myDependency) {
        this.myDependency = myDependency;
    }

    public void processDltMessage(MyPojo message) {
        // ... message processing, persistence, etc
    }
}
如果未提供 DLT 处理程序,则使用默认值。RetryTopicConfigurer.LoggingDltListenerHandlerMethod

从版本 2.8 开始,如果你根本不想在这个应用程序中使用 DLT,包括通过默认处理程序(或者你希望推迟消耗),你可以控制 DLT 容器是否启动,独立于容器工厂的属性。autoStartup

使用批注时,将属性设置为 ;使用 Configuration Builder 时,请使用 。@RetryableTopicautoStartDltHandlerfalseautoStartDltHandler(false)

您可以稍后通过 .KafkaListenerEndpointRegistry

如果未提供 DLT 处理程序,则使用默认值。RetryTopicConfigurer.LoggingDltListenerHandlerMethod

DLT 失败行为

如果 DLT 处理失败,则有两种可能的行为可用:和 .ALWAYS_RETRY_ON_ERRORFAIL_ON_ERROR

在前者中,记录被转发回 DLT 主题,因此它不会阻止其他 DLT 记录的处理。 在后者中,使用者结束执行而不转发消息。

@RetryableTopic(dltProcessingFailureStrategy =
            DltStrategy.FAIL_ON_ERROR)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .dltHandlerMethod("myCustomDltProcessor", "processDltMessage")
            .doNotRetryOnDltFailure()
            .create(template);
}
默认行为是 。ALWAYS_RETRY_ON_ERROR
从版本 2.8.3 开始,如果记录导致引发致命异常,则不会将记录路由回 DLT。 例如 a ,因为通常总是会引发此类异常。ALWAYS_RETRY_ON_ERRORDeserializationException

被视为致命的异常包括:

您可以使用 Bean 上的方法向此列表添加异常和从中删除异常。DestinationTopicResolver

有关更多信息,请参阅 Exception Classifier

默认行为是 。ALWAYS_RETRY_ON_ERROR
从版本 2.8.3 开始,如果记录导致引发致命异常,则不会将记录路由回 DLT。 例如 a ,因为通常总是会引发此类异常。ALWAYS_RETRY_ON_ERRORDeserializationException

配置无 DLT

该框架还提供了不为主题配置 DLT 的可能性。 在这种情况下,在用尽重试后,处理将结束。

@RetryableTopic(dltProcessingFailureStrategy =
            DltStrategy.NO_DLT)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .doNotConfigureDlt()
            .create(template);
}

APP信息