对于最新的稳定版本,请使用 Spring for Apache Kafka 3.2.4! |
对于最新的稳定版本,请使用 Spring for Apache Kafka 3.2.4! |
该框架提供了一些使用 DLT 的策略。 您可以提供 DLT 处理方法、使用默认日志记录方法或根本不使用 DLT。 此外,您还可以选择在 DLT 处理失败时会发生什么情况。
DLT 处理方法
您可以指定用于处理主题的 DLT 的方法,以及处理失败时的行为。
为此,您可以在带有 annotation 的类的方法中使用 annotation。
请注意,相同的方法将用于该类中的所有带 Comments 的方法。@DltHandler
@RetryableTopic
@RetryableTopic
@RetryableTopic
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@DltHandler
public void processMessage(MyPojo message) {
// ... message processing, persistence, etc
}
DLT 处理程序方法也可以通过该方法提供,将应该处理 DLT 消息的 bean 名称和方法名称作为参数传递。RetryTopicConfigurationBuilder.dltHandlerMethod(String, String)
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.dltHandlerMethod("myCustomDltProcessor", "processDltMessage")
.create(template);
}
@Component
public class MyCustomDltProcessor {
private final MyDependency myDependency;
public MyCustomDltProcessor(MyDependency myDependency) {
this.myDependency = myDependency;
}
public void processDltMessage(MyPojo message) {
// ... message processing, persistence, etc
}
}
如果未提供 DLT 处理程序,则使用默认值。RetryTopicConfigurer.LoggingDltListenerHandlerMethod |
从版本 2.8 开始,如果你根本不想在这个应用程序中使用 DLT,包括通过默认处理程序(或者你希望推迟消耗),你可以控制 DLT 容器是否启动,独立于容器工厂的属性。autoStartup
使用批注时,将属性设置为 ;使用 Configuration Builder 时,请使用 。@RetryableTopic
autoStartDltHandler
false
autoStartDltHandler(false)
您可以稍后通过 .KafkaListenerEndpointRegistry
如果未提供 DLT 处理程序,则使用默认值。RetryTopicConfigurer.LoggingDltListenerHandlerMethod |
DLT 失败行为
如果 DLT 处理失败,则有两种可能的行为可用:和 .ALWAYS_RETRY_ON_ERROR
FAIL_ON_ERROR
在前者中,记录被转发回 DLT 主题,因此它不会阻止其他 DLT 记录的处理。 在后者中,使用者结束执行而不转发消息。
@RetryableTopic(dltProcessingFailureStrategy =
DltStrategy.FAIL_ON_ERROR)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.dltHandlerMethod("myCustomDltProcessor", "processDltMessage")
.doNotRetryOnDltFailure()
.create(template);
}
默认行为是 。ALWAYS_RETRY_ON_ERROR |
从版本 2.8.3 开始,如果记录导致引发致命异常,则不会将记录路由回 DLT。
例如 a ,因为通常总是会引发此类异常。ALWAYS_RETRY_ON_ERROR DeserializationException |
被视为致命的异常包括:
-
DeserializationException
-
MessageConversionException
-
ConversionException
-
MethodArgumentResolutionException
-
NoSuchMethodException
-
ClassCastException
您可以使用 Bean 上的方法向此列表添加异常和从中删除异常。DestinationTopicResolver
有关更多信息,请参阅 Exception Classifier。
默认行为是 。ALWAYS_RETRY_ON_ERROR |
从版本 2.8.3 开始,如果记录导致引发致命异常,则不会将记录路由回 DLT。
例如 a ,因为通常总是会引发此类异常。ALWAYS_RETRY_ON_ERROR DeserializationException |
配置无 DLT
该框架还提供了不为主题配置 DLT 的可能性。 在这种情况下,在用尽重试后,处理将结束。
@RetryableTopic(dltProcessingFailureStrategy =
DltStrategy.NO_DLT)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.doNotConfigureDlt()
.create(template);
}