此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.2.1Spring中文文档

此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.2.1Spring中文文档

大多数功能都可用于注释和 bean。@RetryableTopicRetryTopicConfigurationSpring中文文档

回退配置

BackOff 配置依赖于项目中的接口。BackOffPolicySpring RetrySpring中文文档

它包括:Spring中文文档

@RetryableTopic(attempts = 5,
    backoff = @Backoff(delay = 1000, multiplier = 2, maxDelay = 5000))
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackOff(3_000)
            .maxAttempts(4)
            .create(template);
}

您还可以提供 Spring Retry 接口的自定义实现:SleepingBackOffPolicySpring中文文档

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .customBackoff(new MyCustomBackOffPolicy())
            .maxAttempts(5)
            .create(template);
}
默认回退策略为最多 3 次尝试和 1000 毫秒间隔。FixedBackOffPolicy
默认最大延迟为 30 秒。 如果您的退避策略要求延迟的值大于此值,请相应地调整属性。ExponentialBackOffPolicymaxDelay
第一次尝试计入 ,因此,如果提供值 4,则原始尝试加上 3 次重试。maxAttemptsmaxAttempts
默认回退策略为最多 3 次尝试和 1000 毫秒间隔。FixedBackOffPolicy
默认最大延迟为 30 秒。 如果您的退避策略要求延迟的值大于此值,请相应地调整属性。ExponentialBackOffPolicymaxDelay
第一次尝试计入 ,因此,如果提供值 4,则原始尝试加上 3 次重试。maxAttemptsmaxAttempts

全局超时

您可以设置重试过程的全局超时。 如果达到该时间,则下次使用者抛出异常时,消息将直接发送到 DLT,或者在没有可用的 DLT 时结束处理。Spring中文文档

@RetryableTopic(backoff = @Backoff(2_000), timeout = 5_000)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackOff(2_000)
            .timeoutAfter(5_000)
            .create(template);
}
默认设置为未设置超时,也可以通过提供 -1 作为超时值来实现。
默认设置为未设置超时,也可以通过提供 -1 作为超时值来实现。

异常分类器

您可以指定要重试哪些例外,哪些不重试。 您还可以将其设置为遍历原因以查找嵌套异常。Spring中文文档

@RetryableTopic(include = {MyRetryException.class, MyOtherRetryException.class}, traversingCauses = true)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    throw new RuntimeException(new MyRetryException()); // will retry
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .notRetryOn(MyDontRetryException.class)
            .create(template);
}
默认行为是重试所有异常,而不是遍历原因。

从 2.8.3 开始,有一个致命异常的全局列表,这将导致记录被发送到 DLT,而无需任何重试。 有关致命异常的默认列表,请参阅 DefaultErrorHandler。 可以通过重写扩展 的类中的方法,在此列表中添加或删除异常。 有关详细信息,请参阅配置全局设置和功能configureNonBlockingRetries@ConfigurationRetryTopicConfigurationSupportSpring中文文档

@Override
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
    nonBlockingFatalExceptions.add(MyNonBlockingException.class);
}
要禁用致命异常的分类,只需清除提供的列表即可。
默认行为是重试所有异常,而不是遍历原因。
要禁用致命异常的分类,只需清除提供的列表即可。

包含和排除主题

您可以通过 .includeTopic(String topic)、.includeTopics(Collection<String> topics)、.excludeTopic(String topic) 和 .excludeTopics(Collection<String> topics) 方法决定哪些主题将由 Bean 处理,哪些主题不会由 Bean 处理。RetryTopicConfigurationSpring中文文档

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .includeTopics(List.of("my-included-topic", "my-other-included-topic"))
            .create(template);
}

@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .excludeTopic("my-excluded-topic")
            .create(template);
}
默认行为是包含所有主题。
默认行为是包含所有主题。

主题 自动创建

除非另有指定,否则框架将使用 Bean 使用的 Bean 自动创建所需的主题。 您可以指定用于创建主题的分区数和复制因子,并且可以关闭此功能。 从版本 3.0 开始,缺省复制因子是 ,这意味着使用代理缺省值。 如果您的代理版本低于 2.4,则需要设置显式值。NewTopicKafkaAdmin-1Spring中文文档

请注意,如果您不使用 Spring Boot,则必须提供 KafkaAdmin bean 才能使用此功能。
@RetryableTopic(numPartitions = 2, replicationFactor = 3)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}

@RetryableTopic(autoCreateTopics = false)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .autoCreateTopicsWith(2, 3)
            .create(template);
}

@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .doNotAutoCreateRetryTopics()
            .create(template);
}
缺省情况下,主题是自动创建的,具有一个分区和复制因子 -1(表示使用代理默认值)。 如果您的代理版本低于 2.4,则需要设置显式值。
请注意,如果您不使用 Spring Boot,则必须提供 KafkaAdmin bean 才能使用此功能。
缺省情况下,主题是自动创建的,具有一个分区和复制因子 -1(表示使用代理默认值)。 如果您的代理版本低于 2.4,则需要设置显式值。

故障标头管理

在考虑如何管理故障标头(原始标头和异常标头)时,框架委托 来决定是追加还是替换标头。DeadLetterPublishingRecovererSpring中文文档

默认情况下,它显式设置为 并保留为 .appendOriginalHeadersfalsestripPreviousExceptionHeadersDeadLetterPublishingRecoverSpring中文文档

这意味着只有第一个“原始”和最后一个异常标头使用默认配置保留。 这是为了避免在涉及许多重试步骤时创建过大的消息(例如,由于堆栈跟踪标头)。Spring中文文档

有关详细信息,请参阅管理死信记录标头Spring中文文档

若要重新配置框架以对这些属性使用不同的设置,请通过重写扩展 . 有关详细信息,请参阅配置全局设置和功能DeadLetterPublishingRecovererconfigureCustomizers@ConfigurationRetryTopicConfigurationSupportSpring中文文档

@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
    customizersConfigurer.customizeDeadLetterPublishingRecoverer(dlpr -> {
        dlpr.setAppendOriginalHeaders(true);
        dlpr.setStripPreviousExceptionHeaders(false);
    });
}

从版本 2.8.4 开始,如果要添加自定义标头(除了工厂添加的重试信息标头外,还可以在工厂添加 - .headersFunctionfactory.setHeadersFunction((rec, ex) -> { ... })Spring中文文档

默认情况下,添加的任何标头都是累积的 - Kafka 标头可以包含多个值。 从版本 2.9.5 开始,如果函数返回的标头包含类型为 的标头,则该标头的任何现有值都将被删除,并且仅保留新的单个值。HeadersDeadLetterPublishingRecoverer.SingleRecordHeaderSpring中文文档

自定义 DeadLetterPublishingRecoverer

故障标头管理中可以看出,可以自定义框架创建的默认实例。 但是,对于某些用例,需要对 进行子类化,例如覆盖以修改发送到重试(或死信)主题的内容。 从版本 3.0.9 开始,您可以重写该方法以提供实例,例如:DeadLetterPublishingRecovererDeadLetterPublishingRecoverercreateProducerRecord()RetryConfigurationSupport.configureDeadLetterPublishingContainerFactory()DeadLetterPublisherCreatorSpring中文文档

@Override
protected Consumer<DeadLetterPublishingRecovererFactory>
        configureDeadLetterPublishingContainerFactory() {

    return (factory) -> factory.setDeadLetterPublisherCreator(
            (templateResolver, destinationResolver) ->
                    new CustomDLPR(templateResolver, destinationResolver));
}

建议您在构建自定义实例时使用提供的解析程序。Spring中文文档

根据引发的异常将消息路由到自定义 DLT

从版本 3.2.0 开始,可以根据异常类型将消息路由到自定义 DLT,该异常在处理过程中引发。 为此,需要指定路由。 路由自定义包括其他目标的规范。 目标又由两个设置组成:和 。 当引发 中指定的异常类型时,在考虑通用 DLT 之前,将包含 的 DLT 视为消息的目标主题。 使用注解或 Bean 的配置示例:suffixexceptionsexceptionssuffixRetryTopicConfigurationSpring中文文档

@RetryableTopic(exceptionBasedDltRouting = {
    @ExceptionBasedDltDestination(
        suffix = "-deserialization", exceptions = {DeserializationException.class}
    )}
)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .dltRoutingRules(Map.of("-deserialization", Set.of(DeserializationException.class)))
            .create(kafkaOperations)
            .create(template);
}

suffix发生在自定义 DLT 名称中的一般之前。 考虑到所呈现的示例,导致 的消息将被路由到 而不是 . 自定义 DLT 将按照主题自动创建中所述的相同规则创建。dltTopicSuffixDeserializationExceptionmy-annotated-topic-deserialization-dltmy-annotated-topic-dltSpring中文文档