此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.2.1Spring中文文档

此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.2.1Spring中文文档

本节介绍如何处理在将 Spring 用于 Apache Kafka 时可能出现的各种异常。Spring中文文档

侦听器错误处理程序

从版本 2.0 开始,批注具有一个新属性:.@KafkaListenererrorHandlerSpring中文文档

您可以使用 来提供实现的 Bean 名称。 此功能接口具有一种方法,如以下列表所示:errorHandlerKafkaListenerErrorHandlerSpring中文文档

@FunctionalInterface
public interface KafkaListenerErrorHandler {

    Object handleError(Message<?> message, ListenerExecutionFailedException exception) throws Exception;

}

您可以访问消息转换器生成的 spring-messaging 对象以及侦听器引发的异常,该异常包装在 . 错误处理程序可以抛出原始异常或新异常,这些异常被抛入容器。 错误处理程序返回的任何内容都将被忽略。Message<?>ListenerExecutionFailedExceptionSpring中文文档

从版本 2.7 开始,您可以在 和 上设置属性,这会导致将原始内容添加到标头中的转换中。 例如,如果您希望在侦听器错误处理程序中使用 ,这很有用。 它可能用于请求/答复方案,在请求/答复方案中,您希望在捕获死信主题中的失败记录后,经过一定次数的重试后向发件人发送失败结果。rawRecordHeaderMessagingMessageConverterBatchMessagingMessageConverterConsumerRecordMessage<?>KafkaHeaders.RAW_DATADeadLetterPublishingRecovererSpring中文文档

@Bean
public KafkaListenerErrorHandler eh(DeadLetterPublishingRecoverer recoverer) {
    return (msg, ex) -> {
        if (msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class) > 9) {
            recoverer.accept(msg.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class), ex);
            return "FAILED";
        }
        throw ex;
    };
}

它有一个子接口(),可以通过以下方法访问使用者对象:ConsumerAwareListenerErrorHandlerSpring中文文档

Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer);

另一个子接口 () 在使用手动 s 时提供对对象的访问。ManualAckListenerErrorHandlerAcknowledgmentAckModeSpring中文文档

Object handleError(Message<?> message, ListenerExecutionFailedException exception,
			Consumer<?, ?> consumer, @Nullable Acknowledgment ack);

无论哪种情况,都不应对使用者执行任何搜索,因为容器不会知道它们。Spring中文文档

容器错误处理程序

从 2.8 版开始,旧版和接口已被新的 . 这些错误处理程序可以处理记录侦听器和批处理侦听器的错误,从而允许单个侦听器容器工厂为这两种类型的侦听器创建容器。 提供了替换大多数旧框架错误处理程序实现的实现。ErrorHandlerBatchErrorHandlerCommonErrorHandlerCommonErrorHandlerSpring中文文档

有关将自定义错误处理程序迁移到 的信息,请参阅将自定义旧错误处理程序实现迁移到 CommonErrorHandlerCommonErrorHandlerSpring中文文档

使用事务时,默认情况下不会配置任何错误处理程序,因此异常将回滚事务。 事务容器的错误处理由 AfterRollbackProcessor 处理。 如果在使用事务时提供自定义错误处理程序,则在希望回滚事务时,它必须引发异常。Spring中文文档

此接口具有一个默认方法,容器会调用该方法,以确定在错误处理程序返回而未引发异常的情况下是否应提交偏移量;默认情况下,它返回 true。isAckAfterHandle()Spring中文文档

通常,当错误未被“处理”时(例如,在执行搜索操作之后),框架提供的错误处理程序将引发异常。 默认情况下,此类异常由容器在级别记录。 所有框架错误处理程序都进行了扩展,允许您控制记录这些异常的级别。ERRORKafkaExceptionLogLevelAwareSpring中文文档

/**
 * Set the level at which the exception thrown by this handler is logged.
 * @param logLevel the level (default ERROR).
 */
public void setLogLevel(KafkaException.Level logLevel) {
    ...
}

您可以指定要用于容器工厂中所有侦听器的全局错误处理程序。 以下示例演示如何执行此操作:Spring中文文档

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
        kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    ...
    factory.setCommonErrorHandler(myErrorHandler);
    ...
    return factory;
}

默认情况下,如果带注释的侦听器方法抛出异常,则将其抛向容器,并根据容器配置处理消息。Spring中文文档

容器在调用错误处理程序之前提交任何挂起的偏移量提交。Spring中文文档

如果您使用的是 Spring Boot,您只需将错误处理程序添加为 Boot,Boot 就会将其添加到自动配置的工厂中。@BeanSpring中文文档

回退处理程序

错误处理程序(如 DefaultErrorHandler)使用 a 来确定在重试传递之前要等待多长时间。 从版本 2.9 开始,您可以配置自定义 . 默认处理程序只是挂起线程,直到回退时间过去(或容器停止)。 该框架还提供暂停侦听器容器,直到回退时间过去,然后恢复容器。 当延迟时间长于消费者财产时,这很有用。 请注意,实际回退时间的解析将受容器属性的影响。BackOffBackOffHandlerContainerPausingBackOffHandlermax.poll.interval.mspollTimeoutSpring中文文档

DefaultErrorHandler

这个新的错误处理程序取代了 和 ,后者现在是多个版本的默认错误处理程序。 一个区别是,批处理侦听器的回退行为(当引发除 a 以外的异常时)等同于重试完成批处理SeekToCurrentErrorHandlerRecoveringBatchErrorHandlerBatchListenerFailedExceptionSpring中文文档

从版本 2.9 开始,可以配置为提供与查找未处理的记录偏移量相同的语义,如下所述,但实际上不查找。 相反,记录由侦听器容器保留,并在错误处理程序退出后重新提交到侦听器(在执行单个暂停后,以保持使用者处于活动状态;如果使用非阻塞重试或 a,则暂停可能会延伸到多个轮询)。 错误处理程序将结果返回到容器,该结果指示是否可以重新提交当前失败的记录,或者是否已恢复,然后不会再次将其发送到侦听器。 若要启用此模式,请将属性设置为 。DefaultErrorHandlerpoll()ContainerPausingBackOffHandlerseekAfterErrorfalse

错误处理程序可以恢复(跳过)不断失败的记录。 默认情况下,在 10 次失败后,将记录失败的记录(在级别上)。 您可以使用自定义恢复器 () 和 a 来配置处理程序,以控制每个恢复器之间的传递尝试和延迟。 使用 with 原因(有效地)无限重试。 以下示例在三次尝试后配置恢复:ERRORBiConsumerBackOffFixedBackOffFixedBackOff.UNLIMITED_ATTEMPTSSpring中文文档

DefaultErrorHandler errorHandler =
    new DefaultErrorHandler((record, exception) -> {
        // recover after 3 failures, with no back off - e.g. send to a dead-letter topic
    }, new FixedBackOff(0L, 2L));

若要使用此处理程序的自定义实例配置侦听器容器,请将其添加到容器工厂。Spring中文文档

例如,使用容器工厂,可以添加如下内容:@KafkaListenerDefaultErrorHandlerSpring中文文档

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckMode(AckMode.RECORD);
    factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000L, 2L)));
    return factory;
}

对于记录侦听器,这将重试最多 2 次(3 次传送尝试),并回退 1 秒,而不是默认配置 ()。 在重试用尽后,只会记录失败。FixedBackOff(0L, 9)Spring中文文档

例如,如果返回 6 条记录(每个分区 0、1、2 中有 2 条),并且侦听器在第四个记录上引发异常,则容器通过提交前三条消息的偏移量来确认它们。 寻求分区 1 的偏移量 1 和分区 2 的偏移量 0。 下一个返回三个未处理的记录。pollDefaultErrorHandlerpoll()Spring中文文档

如果为 ,则容器在调用错误处理程序之前提交前两个分区的偏移量。AckModeBATCHSpring中文文档

对于批处理侦听器,侦听器必须抛出一个指示批处理中哪些记录失败的指示。BatchListenerFailedExceptionSpring中文文档

事件的顺序是:Spring中文文档

  • 在索引之前提交记录的偏移量。Spring中文文档

  • 如果重试未用尽,请执行查找,以便重新传递所有剩余记录(包括失败的记录)。Spring中文文档

  • 如果重试次数用尽,请尝试恢复失败的记录(仅限默认日志)并执行寻道,以便重新传递剩余的记录(不包括失败的记录)。 已提交恢复的记录的偏移量。Spring中文文档

  • 如果重试已用尽且恢复失败,则将执行寻道,就好像重试未用尽一样。Spring中文文档

从版本 2.9 开始,可以配置为提供与查找上述未处理的记录偏移量相同的语义,但实际上不查找。 相反,错误处理程序创建一个仅包含未处理记录的新记录,然后将这些记录提交给侦听器(在执行单个暂停后,以保持使用者处于活动状态)。 若要启用此模式,请将属性设置为 。DefaultErrorHandlerConsumerRecords<?, ?>poll()seekAfterErrorfalse

默认恢复程序在重试耗尽后记录失败的记录。 可以使用自定义恢复器,也可以使用框架提供的恢复器,例如 DeadLetterPublishingRecovererSpring中文文档

使用 POJO 批处理侦听器(例如 ),并且您没有要添加到异常的完整使用者记录时,只需添加失败记录的索引:List<Thing>Spring中文文档

@KafkaListener(id = "recovering", topics = "someTopic")
public void listen(List<Thing> things) {
    for (int i = 0; i < things.size(); i++) {
        try {
            process(things.get(i));
        }
        catch (Exception e) {
            throw new BatchListenerFailedException("Failed to process", i);
        }
    }
}

当容器配置为 时,可以将错误处理程序配置为提交已恢复记录的偏移量;将属性设置为 。AckMode.MANUAL_IMMEDIATEcommitRecoveredtrueSpring中文文档

使用事务时,. 请参阅回滚后处理器DefaultAfterRollbackProcessorSpring中文文档

认为某些异常是致命的,并且会跳过此类异常的重试;在第一次失败时调用恢复程序。 默认情况下,被视为致命的例外情况包括:DefaultErrorHandlerSpring中文文档

因为这些异常不太可能在重试传递时得到解决。Spring中文文档

您可以向不可重试的类别添加更多异常类型,也可以完全替换分类异常的映射。 有关更多信息,请参阅 Javadocs,以及 .DefaultErrorHandler.addNotRetryableException()DefaultErrorHandler.setClassifications()spring-retryBinaryExceptionClassifierSpring中文文档

下面是一个添加到不可重试异常的示例:IllegalArgumentExceptionSpring中文文档

@Bean
public DefaultErrorHandler errorHandler(ConsumerRecordRecoverer recoverer) {
    DefaultErrorHandler handler = new DefaultErrorHandler(recoverer);
    handler.addNotRetryableExceptions(IllegalArgumentException.class);
    return handler;
}

错误处理程序可以配置一个或多个 s,接收重试和恢复进度的通知。 从版本 2.8.10 开始,添加了批处理侦听器的方法。RetryListenerSpring中文文档

@FunctionalInterface
public interface RetryListener {

    void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);

    default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
    }

    default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
    }

    default void failedDelivery(ConsumerRecords<?, ?> records, Exception ex, int deliveryAttempt) {
    }

    default void recovered(ConsumerRecords<?, ?> records, Exception ex) {
    }

	default void recoveryFailed(ConsumerRecords<?, ?> records, Exception original, Exception failure) {
	}

}

有关更多信息,请参阅 JavaDocs。Spring中文文档

如果恢复程序失败(引发异常),则失败的记录将包含在搜索中。 如果恢复器失败,则默认情况下将重置,并且在再次尝试恢复之前,重新交付将再次进行回退。 若要在恢复失败后跳过重试,请将错误处理程序设置为 。BackOffresetStateOnRecoveryFailurefalse

您可以为错误处理程序提供一个,以根据失败的记录和/或异常确定要使用的内容:BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>BackOffSpring中文文档

handler.setBackOffFunction((record, ex) -> { ... });

如果函数返回 ,则将使用处理程序的默认值。nullBackOffSpring中文文档

如果异常类型在两次失败之间发生更改,则设置为重试序列(包括选择新的 ,如果已配置)。 当(版本 2.9 之前的默认值)时,不考虑异常类型。resetStateOnExceptionChangetrueBackOfffalseSpring中文文档

从版本 2.9 开始,现在是默认设置。trueSpring中文文档

从版本 2.9 开始,可以配置为提供与查找未处理的记录偏移量相同的语义,如下所述,但实际上不查找。 相反,记录由侦听器容器保留,并在错误处理程序退出后重新提交到侦听器(在执行单个暂停后,以保持使用者处于活动状态;如果使用非阻塞重试或 a,则暂停可能会延伸到多个轮询)。 错误处理程序将结果返回到容器,该结果指示是否可以重新提交当前失败的记录,或者是否已恢复,然后不会再次将其发送到侦听器。 若要启用此模式,请将属性设置为 。DefaultErrorHandlerpoll()ContainerPausingBackOffHandlerseekAfterErrorfalse
从版本 2.9 开始,可以配置为提供与查找上述未处理的记录偏移量相同的语义,但实际上不查找。 相反,错误处理程序创建一个仅包含未处理记录的新记录,然后将这些记录提交给侦听器(在执行单个暂停后,以保持使用者处于活动状态)。 若要启用此模式,请将属性设置为 。DefaultErrorHandlerConsumerRecords<?, ?>poll()seekAfterErrorfalse
如果恢复程序失败(引发异常),则失败的记录将包含在搜索中。 如果恢复器失败,则默认情况下将重置,并且在再次尝试恢复之前,重新交付将再次进行回退。 若要在恢复失败后跳过重试,请将错误处理程序设置为 。BackOffresetStateOnRecoveryFailurefalse

批处理错误处理程序的转换错误

从版本 2.8 开始,批处理侦听器现在可以正确处理转换错误,当将 a 与 、 a 或 a 一起使用 时。 发生转换错误时,有效负载将设置为 null,并将反序列化异常添加到记录标头,类似于 . 侦听器中提供了 s 列表,因此侦听器可以抛出指示发生转换异常的第一个索引。MessageConverterByteArrayDeserializerBytesDeserializerStringDeserializerDefaultErrorHandlerErrorHandlingDeserializerConversionExceptionBatchListenerFailedExceptionSpring中文文档

@KafkaListener(id = "test", topics = "topic")
void listen(List<Thing> in, @Header(KafkaHeaders.CONVERSION_FAILURES) List<ConversionException> exceptions) {
    for (int i = 0; i < in.size(); i++) {
        Foo foo = in.get(i);
        if (foo == null && exceptions.get(i) != null) {
            throw new BatchListenerFailedException("Conversion error", exceptions.get(i), i);
        }
        process(foo);
    }
}

重试完整批处理

现在,这是批处理侦听器的回退行为,其中侦听器抛出的异常不是 .DefaultErrorHandlerBatchListenerFailedExceptionSpring中文文档

无法保证在重新交付批次时,该批次具有相同数量的记录和/或重新交付的记录顺序相同。 因此,不可能轻松维护批处理的重试状态。 采用以下方法。 如果批处理侦听器引发的异常不是 ,则从内存中的记录批处理中执行重试。 为了避免在延长的重试序列期间重新平衡,错误处理程序会暂停使用者,在休眠之前轮询它以进行回退,每次重试,然后再次调用侦听器。 如果/当重试次数用尽时,则为批处理中的每条记录调用 。 如果恢复程序引发异常,或者线程在休眠期间中断,则该批记录将在下一次轮询时重新传递。 在退出之前,无论结果如何,消费者都会恢复。FallbackBatchErrorHandlerBatchListenerFailedExceptionConsumerRecordRecovererSpring中文文档

此机制不能用于事务。

在等待时间间隔时,错误处理程序将循环进行短暂的休眠,直到达到所需的延迟,同时检查容器是否已停止,允许休眠在停止后很快退出,而不是导致延迟。BackOffstop()Spring中文文档

此机制不能用于事务。

容器停止错误处理程序

如果侦听器引发异常,则停止容器。 对于记录侦听器,当 为 时,将提交已处理记录的偏移量。 对于记录侦听器,当 是任何手动值时,将提交已确认记录的偏移量。 对于记录侦听器,当 为 时,或者对于批处理侦听器,在重新启动容器时重播整个批处理。CommonContainerStoppingErrorHandlerAckModeRECORDAckModeAckModeBATCHSpring中文文档

容器停止后,将抛出包装 的异常。 这是为了使事务回滚(如果启用了事务)。ListenerExecutionFailedExceptionSpring中文文档

委派错误处理程序

可以委托给不同的错误处理程序,具体取决于异常类型。 例如,您可能希望为大多数异常调用 a,或为其他异常调用 a。CommonDelegatingErrorHandlerDefaultErrorHandlerCommonContainerStoppingErrorHandlerSpring中文文档

所有委托必须共享相同的兼容属性 (, ...)。ackAfterHandleseekAfterErrorSpring中文文档

日志记录错误处理程序

只是记录异常;使用记录侦听器时,上一次轮询的剩余记录将传递给侦听器。 对于批处理侦听器,将记录批处理中的所有记录。CommonLoggingErrorHandlerSpring中文文档

对记录侦听器和批处理侦听器使用不同的常见错误处理程序

如果您希望对记录侦听器和批处理侦听器使用不同的错误处理策略,则提供了允许为每种侦听器类型配置特定错误处理程序。CommonMixedErrorHandlerSpring中文文档

常见错误处理程序摘要

旧版错误处理程序及其替代程序

旧版错误处理程序 更换

LoggingErrorHandlerSpring中文文档

CommonLoggingErrorHandlerSpring中文文档

BatchLoggingErrorHandlerSpring中文文档

CommonLoggingErrorHandlerSpring中文文档

ConditionalDelegatingErrorHandlerSpring中文文档

DelegatingErrorHandlerSpring中文文档

ConditionalDelegatingBatchErrorHandlerSpring中文文档

DelegatingErrorHandlerSpring中文文档

ContainerStoppingErrorHandlerSpring中文文档

CommonContainerStoppingErrorHandlerSpring中文文档

ContainerStoppingBatchErrorHandlerSpring中文文档

CommonContainerStoppingErrorHandlerSpring中文文档

SeekToCurrentErrorHandlerSpring中文文档

DefaultErrorHandlerSpring中文文档

SeekToCurrentBatchErrorHandlerSpring中文文档

没有替代品,使用无限.DefaultErrorHandlerBackOffSpring中文文档

RecoveringBatchErrorHandlerSpring中文文档

DefaultErrorHandlerSpring中文文档

RetryingBatchErrorHandlerSpring中文文档

没有替换,使用并抛出除 以外的异常。DefaultErrorHandlerBatchListenerFailedExceptionSpring中文文档

将自定义旧版错误处理程序实现迁移到CommonErrorHandler

请参阅 中的 JavaDocs。CommonErrorHandlerSpring中文文档

要替换 or 实现,应实现并保留返回(默认值)。 您还应该实现以处理在记录处理范围之外发生的异常(例如使用者错误)。ErrorHandlerConsumerAwareErrorHandlerhandleOne()seeksAfterHandle()falsehandleOtherException()Spring中文文档

若要替换实现,应实现并重写以返回(错误处理程序必须执行必要的查找)。 您还应该实现 - 以处理在记录处理范围之外发生的异常(例如使用者错误)。RemainingRecordsErrorHandlerhandleRemaining()seeksAfterHandle()truehandleOtherException()Spring中文文档

要替换任何实现,您还应该实现 - 以处理在记录处理范围之外发生的异常(例如使用者错误)。BatchErrorHandlerhandleBatch()handleOtherException()Spring中文文档

旧版错误处理程序 更换

LoggingErrorHandlerSpring中文文档

CommonLoggingErrorHandlerSpring中文文档

BatchLoggingErrorHandlerSpring中文文档

CommonLoggingErrorHandlerSpring中文文档

ConditionalDelegatingErrorHandlerSpring中文文档

DelegatingErrorHandlerSpring中文文档

ConditionalDelegatingBatchErrorHandlerSpring中文文档

DelegatingErrorHandlerSpring中文文档

ContainerStoppingErrorHandlerSpring中文文档

CommonContainerStoppingErrorHandlerSpring中文文档

ContainerStoppingBatchErrorHandlerSpring中文文档

CommonContainerStoppingErrorHandlerSpring中文文档

SeekToCurrentErrorHandlerSpring中文文档

DefaultErrorHandlerSpring中文文档

SeekToCurrentBatchErrorHandlerSpring中文文档

没有替代品,使用无限.DefaultErrorHandlerBackOffSpring中文文档

RecoveringBatchErrorHandlerSpring中文文档

DefaultErrorHandlerSpring中文文档

RetryingBatchErrorHandlerSpring中文文档

没有替换,使用并抛出除 以外的异常。DefaultErrorHandlerBatchListenerFailedExceptionSpring中文文档

回滚后处理器

使用事务时,如果侦听器引发异常(并且错误处理程序(如果存在)引发异常),则事务将回滚。 默认情况下,任何未处理的记录(包括失败的记录)都会在下次轮询时重新提取。 这是通过在 . 使用批处理侦听器,将重新处理整个批次的记录(容器不知道批处理中的哪条记录失败)。 若要修改此行为,可以使用自定义 . 例如,对于基于记录的侦听器,您可能希望跟踪失败的记录,并在尝试一定次数后放弃,可能将其发布到死信主题。seekDefaultAfterRollbackProcessorAfterRollbackProcessorSpring中文文档

从版本 2.2 开始,现在可以恢复(跳过)不断失败的记录。 默认情况下,在 10 次失败后,将记录失败的记录(在级别上)。 您可以使用自定义恢复程序 () 和最大故障数配置处理器。 将属性设置为负数会导致无限重试。 以下示例在三次尝试后配置恢复:DefaultAfterRollbackProcessorERRORBiConsumermaxFailuresSpring中文文档

AfterRollbackProcessor<String, String> processor =
    new DefaultAfterRollbackProcessor((record, exception) -> {
        // recover after 3 failures, with no back off - e.g. send to a dead-letter topic
    }, new FixedBackOff(0L, 2L));

当您不使用事务时,您可以通过配置 . 请参阅容器错误处理程序DefaultErrorHandlerSpring中文文档

从版本 3.2 开始,Recovery 现在可以恢复(跳过)不断失败的整批记录。 设置为启用此功能。ContainerProperties.setBatchRecoverAfterRollback(true)Spring中文文档

默认行为,批处理侦听器无法恢复,因为框架不知道批处理中的哪条记录不断失败。 在这种情况下,应用程序侦听器必须处理不断失败的记录。

从版本 2.2.5 开始,可以在新事务中调用 (在失败的事务回滚后启动)。 然后,如果使用 发布失败的记录,处理器会将恢复的记录在原始主题/分区中的偏移量发送到事务。 若要启用此功能,请在 .DefaultAfterRollbackProcessorDeadLetterPublishingRecoverercommitRecoveredkafkaTemplateDefaultAfterRollbackProcessorSpring中文文档

如果恢复程序失败(引发异常),则失败的记录将包含在搜索中。 从版本 2.5.5 开始,如果恢复程序失败,则默认情况下将重置,并且在再次尝试恢复之前,将再次进行回退。 在早期版本中,不会重置,并在下次失败时重新尝试恢复。 若要恢复到以前的行为,请将处理器的属性设置为 。BackOffBackOffresetStateOnRecoveryFailurefalse

从版本 2.6 开始,您现在可以为处理器提供一个,以根据失败的记录和/或异常来确定要使用的内容:BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>BackOffSpring中文文档

handler.setBackOffFunction((record, ex) -> { ... });

如果函数返回 ,则将使用处理器的默认值。nullBackOffSpring中文文档

从版本 2.6.3 开始,如果异常类型在两次失败之间发生更改,则将重新启动重试序列(包括选择新的 ,如果已配置)。 默认情况下,不考虑异常类型。resetStateOnExceptionChangetrueBackOffSpring中文文档

从版本 2.3.1 开始,与 类似,认为某些异常是致命的,并且对于此类异常会跳过重试;在第一次失败时调用恢复程序。 默认情况下,被视为致命的例外情况包括:DefaultErrorHandlerDefaultAfterRollbackProcessorSpring中文文档

因为这些异常不太可能在重试传递时得到解决。Spring中文文档

您可以向不可重试的类别添加更多异常类型,也可以完全替换分类异常的映射。 有关更多信息,请参阅 Javadocs,以及 .DefaultAfterRollbackProcessor.setClassifications()spring-retryBinaryExceptionClassifierSpring中文文档

下面是一个添加到不可重试异常的示例:IllegalArgumentExceptionSpring中文文档

@Bean
public DefaultAfterRollbackProcessor errorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer) {
    DefaultAfterRollbackProcessor processor = new DefaultAfterRollbackProcessor(recoverer);
    processor.addNotRetryableException(IllegalArgumentException.class);
    return processor;
}
使用 current 时,容器无法检测 a 是否由重新平衡引起,或者生产者是否因超时或到期而被撤销。 因为在大多数情况下,它是由重新平衡引起的,所以容器不会调用 (因为查找分区是不合适的,因为我们不再被分配它们)。 如果您确保超时足够大以处理每个事务并定期执行“空”事务(例如通过 ),则可以避免由于超时和到期而导致的隔离。 或者,可以将容器属性设置为 并且容器将停止,从而避免丢失记录。 可以使用 a 并检查属性来检测此情况。 由于该事件还具有对容器的引用,因此可以使用此事件重新启动容器。kafka-clientsProducerFencedExceptiontransactional.idAfterRollbackProcessorListenerContainerIdleEventstopContainerWhenFencedtrueConsumerStoppedEventReasonFENCED

从版本 2.7 开始,在等待时间间隔时,错误处理程序将循环进行短暂的休眠,直到达到所需的延迟,同时检查容器是否已停止,允许休眠在停止后不久退出,而不是导致延迟。BackOffstop()Spring中文文档

从版本 2.7 开始,处理器可以配置一个或多个 s,接收重试和恢复进度的通知。RetryListenerSpring中文文档

@FunctionalInterface
public interface RetryListener {

    void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);

    default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
    }

    default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
    }

}

有关更多信息,请参阅 JavaDocs。Spring中文文档

默认行为,批处理侦听器无法恢复,因为框架不知道批处理中的哪条记录不断失败。 在这种情况下,应用程序侦听器必须处理不断失败的记录。
如果恢复程序失败(引发异常),则失败的记录将包含在搜索中。 从版本 2.5.5 开始,如果恢复程序失败,则默认情况下将重置,并且在再次尝试恢复之前,将再次进行回退。 在早期版本中,不会重置,并在下次失败时重新尝试恢复。 若要恢复到以前的行为,请将处理器的属性设置为 。BackOffBackOffresetStateOnRecoveryFailurefalse
使用 current 时,容器无法检测 a 是否由重新平衡引起,或者生产者是否因超时或到期而被撤销。 因为在大多数情况下,它是由重新平衡引起的,所以容器不会调用 (因为查找分区是不合适的,因为我们不再被分配它们)。 如果您确保超时足够大以处理每个事务并定期执行“空”事务(例如通过 ),则可以避免由于超时和到期而导致的隔离。 或者,可以将容器属性设置为 并且容器将停止,从而避免丢失记录。 可以使用 a 并检查属性来检测此情况。 由于该事件还具有对容器的引用,因此可以使用此事件重新启动容器。kafka-clientsProducerFencedExceptiontransactional.idAfterRollbackProcessorListenerContainerIdleEventstopContainerWhenFencedtrueConsumerStoppedEventReasonFENCED

传递尝试标头

以下内容仅适用于录制侦听器,不适用于批处理侦听器。Spring中文文档

从版本 2.5 开始,当使用 或 实现 时,可以启用将标头 () 添加到记录中。 此标头的值是从 1 开始递增的整数。 当接收原始时,整数位于 .ErrorHandlerAfterRollbackProcessorDeliveryAttemptAwareKafkaHeaders.DELIVERY_ATTEMPTkafka_deliveryAttemptConsumerRecord<?, ?>byte[4]Spring中文文档

int delivery = ByteBuffer.wrap(record.headers()
    .lastHeader(KafkaHeaders.DELIVERY_ATTEMPT).value())
    .getInt();

当与 或 一起使用时,可以通过将它作为参数添加到侦听器方法中来获得。@KafkaListenerDefaultKafkaHeaderMapperSimpleKafkaHeaderMapper@Header(KafkaHeaders.DELIVERY_ATTEMPT) int deliverySpring中文文档

若要启用此标头的填充,请将容器属性设置为 。 默认情况下,它处于禁用状态,以避免查找每条记录的状态并添加标头的(小)开销。deliveryAttemptHeadertrueSpring中文文档

和 支持此功能。DefaultErrorHandlerDefaultAfterRollbackProcessorSpring中文文档

侦听器信息标头

在某些情况下,能够知道侦听器在哪个容器中运行是很有用的。Spring中文文档

从版本 2.8.4 开始,您现在可以在侦听器容器上设置属性,或在注释上设置属性。 然后,容器会在标头中将其添加到所有传入消息中;然后,它可以用于记录拦截器、过滤器等,或用于侦听器本身。listenerInfoinfo@KafkaListenerKafkaListener.LISTENER_INFOSpring中文文档

@KafkaListener(id = "something", topics = "topic", filter = "someFilter",
        info = "this is the something listener")
public void listen(@Payload Thing thing,
        @Header(KafkaHeaders.LISTENER_INFO) String listenerInfo) {
    ...
}

在 or 实现中使用时,标头作为字节数组在使用者记录中,使用 的属性进行转换。RecordInterceptorRecordFilterStrategyKafkaListenerAnnotationBeanPostProcessorcharSetSpring中文文档

标头映射器也会在从使用者记录创建时转换为,并且从不将此标头映射到出站记录上。StringMessageHeadersSpring中文文档

对于 POJO 批处理侦听器,从版本 2.8.6 开始,标头将复制到批处理的每个成员中,并且在转换后也可以作为单个参数使用。StringSpring中文文档

@KafkaListener(id = "list2", topics = "someTopic", containerFactory = "batchFactory",
        info = "info for batch")
public void listen(List<Thing> list,
        @Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
        @Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
        @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
        @Header(KafkaHeaders.OFFSET) List<Long> offsets,
        @Header(KafkaHeaders.LISTENER_INFO) String info) {
            ...
}
如果批处理侦听器具有筛选器,并且筛选器生成空批处理,则需要添加到参数中,因为该信息不适用于空批处理。required = false@Header

如果您收到的信息在每个 .List<Message<Thing>>KafkaHeaders.LISTENER_INFOMessage<?>Spring中文文档

有关使用批处理的更多信息,请参阅批处理侦听器Spring中文文档

如果批处理侦听器具有筛选器,并且筛选器生成空批处理,则需要添加到参数中,因为该信息不适用于空批处理。required = false@Header

发布死信记录

当记录达到最大故障数时,可以使用记录恢复器配置 and。 该框架提供了 ,它将失败的消息发布到另一个主题。 恢复程序需要一个 ,用于发送记录。 您也可以选择使用 ,调用 来解析目标主题和分区。DefaultErrorHandlerDefaultAfterRollbackProcessorDeadLetterPublishingRecovererKafkaTemplate<Object, Object>BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>Spring中文文档

默认情况下,死信记录将发送到名为(原始主题名称后缀为 )的主题,并发送到与原始记录相同的分区。 因此,使用默认解析程序时,死信主题的分区数必须至少与原始主题相同。<originalTopic>-dlt-dlt

如果返回的分区为负,则未在 中设置该分区,因此 Kafka 会选择该分区。 从版本 2.2.4 开始,任何(例如,在方法中检测到异常时引发)都会使用该属性进行增强。 这允许目标解析程序使用它,以及 to select the dead letter 主题中的信息。TopicPartitionProducerRecordListenerExecutionFailedException@KafkaListenergroupIdConsumerRecordSpring中文文档

以下示例演示如何连接自定义目标解析程序:Spring中文文档

DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
        (r, e) -> {
            if (e instanceof FooException) {
                return new TopicPartition(r.topic() + ".Foo.failures", r.partition());
            }
            else {
                return new TopicPartition(r.topic() + ".other.failures", r.partition());
            }
        });
CommonErrorHandler errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(0L, 2L));

发送到死信主题的记录通过以下标头得到增强:Spring中文文档

  • KafkaHeaders.DLT_EXCEPTION_FQCN:Exception 类名(通常为 ,但也可以是其他)。ListenerExecutionFailedExceptionSpring中文文档

  • KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN:异常原因类名(如果存在)(从版本 2.8 开始)。Spring中文文档

  • KafkaHeaders.DLT_EXCEPTION_STACKTRACE:异常堆栈跟踪。Spring中文文档

  • KafkaHeaders.DLT_EXCEPTION_MESSAGE:异常消息。Spring中文文档

  • KafkaHeaders.DLT_KEY_EXCEPTION_FQCN:Exception 类名(仅限密钥反序列化错误)。Spring中文文档

  • KafkaHeaders.DLT_KEY_EXCEPTION_STACKTRACE:异常堆栈跟踪(仅限关键反序列化错误)。Spring中文文档

  • KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE:异常消息(仅限关键反序列化错误)。Spring中文文档

  • KafkaHeaders.DLT_ORIGINAL_TOPIC:原始主题。Spring中文文档

  • KafkaHeaders.DLT_ORIGINAL_PARTITION:原始分区。Spring中文文档

  • KafkaHeaders.DLT_ORIGINAL_OFFSET:原始偏移量。Spring中文文档

  • KafkaHeaders.DLT_ORIGINAL_TIMESTAMP:原始时间戳。Spring中文文档

  • KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE:原始时间戳类型。Spring中文文档

  • KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP:无法处理记录的原始使用者组(从版本 2.8 开始)。Spring中文文档

关键异常仅由 s 引起,因此没有 .DeserializationExceptionDLT_KEY_EXCEPTION_CAUSE_FQCNSpring中文文档

有两种机制可以添加更多标头。Spring中文文档

  1. 子类化恢复器并覆盖 - 调用并添加更多标头。createProducerRecord()super.createProducerRecord()Spring中文文档

  2. 提供 a 接收消费者记录和异常,返回一个对象;那里的标头将被复制到最终的生产者记录中;另请参阅管理死信记录标头。 用于设置 .BiFunctionHeaderssetHeadersFunction()BiFunctionSpring中文文档

第二个更易于实现,但第一个具有更多信息,包括已组装的标准标头。Spring中文文档

从版本 2.3 开始,当与 结合使用时,发布者会将死信生产者记录中的记录还原为无法反序列化的原始值。 以前,为 null,用户代码必须从消息标头中解码。 此外,您可以向发布者提供多个 s;例如,如果要发布 from a 以及使用不同序列化程序从成功反序列化的记录中的值,则可能需要这样做。 下面是使用使用 and 序列化程序的 s 配置发布者的示例:ErrorHandlingDeserializervalue()value()DeserializationExceptionKafkaTemplatebyte[]DeserializationExceptionKafkaTemplateStringbyte[]Spring中文文档

@Bean
public DeadLetterPublishingRecoverer publisher(KafkaTemplate<?, ?> stringTemplate,
        KafkaTemplate<?, ?> bytesTemplate) {
    Map<Class<?>, KafkaTemplate<?, ?>> templates = new LinkedHashMap<>();
    templates.put(String.class, stringTemplate);
    templates.put(byte[].class, bytesTemplate);
    return new DeadLetterPublishingRecoverer(templates);
}

发布者使用地图键查找适合即将发布的模板。 建议使用 A,以便按顺序检查密钥。value()LinkedHashMapSpring中文文档

当发布值并且有多个模板时,恢复器将为类查找模板;如果不存在,则将使用 中的第一个模板。nullVoidvalues().iterator()Spring中文文档

从 2.7 开始,您可以使用该方法,以便在消息发布失败时引发异常。 您还可以设置超时,以便使用 验证发件人是否成功。setFailIfSendResultIsErrorsetWaitForSendResultTimeoutSpring中文文档

如果恢复程序失败(引发异常),则失败的记录将包含在搜索中。 从版本 2.5.5 开始,如果恢复程序失败,则默认情况下将重置,并且在再次尝试恢复之前,将再次进行回退。 在早期版本中,不会重置,并在下次失败时重新尝试恢复。 若要恢复到以前的行为,请将错误处理程序的属性设置为 。BackOffBackOffresetStateOnRecoveryFailurefalse

从版本 2.6.3 开始,如果异常类型在两次失败之间发生更改,则将重新启动重试序列(包括选择新的 ,如果已配置)。 默认情况下,不考虑异常类型。resetStateOnExceptionChangetrueBackOffSpring中文文档

从版本 2.3 开始,恢复器还可以与 Kafka Streams 一起使用 - 有关详细信息,请参阅从反序列化异常中恢复Spring中文文档

在标头和(使用 Java 序列化)中添加反序列化异常。 默认情况下,这些标头不会保留在发布到死信主题的消息中。 从版本 2.7 开始,如果键和值都未通过反序列化,则两者的原始值都将填充在发送到 DLT 的记录中。ErrorHandlingDeserializerErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADERErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADERSpring中文文档

如果传入的记录相互依赖,但可能无序到达,则将失败的记录重新发布到原始主题的尾部(多次)可能会很有用,而不是将其直接发送到死信主题。 有关示例,请参阅此堆栈溢出问题Spring中文文档

以下错误处理程序配置将完全执行此操作:Spring中文文档

@Bean
public ErrorHandler eh(KafkaOperations<String, String> template) {
    return new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template,
            (rec, ex) -> {
                org.apache.kafka.common.header.Header retries = rec.headers().lastHeader("retries");
                if (retries == null) {
                    retries = new RecordHeader("retries", new byte[] { 1 });
                    rec.headers().add(retries);
                }
                else {
                    retries.value()[0]++;
                }
                return retries.value()[0] > 5
                        ? new TopicPartition("topic.DLT", rec.partition())
                        : new TopicPartition("topic", rec.partition());
            }), new FixedBackOff(0L, 0L));
}

从版本 2.7 开始,恢复程序将检查目标解析程序选择的分区是否确实存在。 如果分区不存在,则 中的分区设置为 ,允许 选择分区。 可以通过将属性设置为 来禁用此检查。ProducerRecordnullKafkaProducerverifyPartitionfalseSpring中文文档

从版本 3.1 开始,将属性设置为将记录恢复记录和异常。logRecoveryRecordtrueSpring中文文档

默认情况下,死信记录将发送到名为(原始主题名称后缀为 )的主题,并发送到与原始记录相同的分区。 因此,使用默认解析程序时,死信主题的分区数必须至少与原始主题相同。<originalTopic>-dlt-dlt
如果恢复程序失败(引发异常),则失败的记录将包含在搜索中。 从版本 2.5.5 开始,如果恢复程序失败,则默认情况下将重置,并且在再次尝试恢复之前,将再次进行回退。 在早期版本中,不会重置,并在下次失败时重新尝试恢复。 若要恢复到以前的行为,请将错误处理程序的属性设置为 。BackOffBackOffresetStateOnRecoveryFailurefalse

管理死信记录标头

参考上面的发布死信记录,有两个属性用于在标头已存在时管理标头(例如,在重新处理失败的死信记录时,包括使用非阻塞重试时)。DeadLetterPublishingRecovererSpring中文文档

Apache Kafka 支持多个同名的标头;要获取“最新”值,可以使用;若要获取多个标头上的迭代器,请使用 .headers.lastHeader(headerName)headers.headers(headerName).iterator()Spring中文文档

重复重新发布失败的记录时,这些标头可能会增长(并最终导致发布失败,因为 );对于异常标头,尤其是堆栈跟踪标头,尤其如此。RecordTooLargeExceptionSpring中文文档

这两个属性的原因是,虽然您可能只想保留最后一个异常信息,但您可能希望保留记录在每次失败时传递的主题的历史记录。Spring中文文档

appendOriginalHeaders应用于所有命名的标头,而应用于所有名为 的标头。ORIGINALstripPreviousExceptionHeadersEXCEPTIONSpring中文文档

从版本 2.8.4 开始,您现在可以控制将哪些标准标头添加到输出记录中。 有关默认添加的(当前)10 个标准标头的通用名称,请参阅 (这些不是实际的标头名称,只是一个抽象;实际的标头名称由子类可以覆盖的方法设置。enum HeadersToAddgetHeaderNames()Spring中文文档

若要排除标头,请使用以下方法;例如,若要禁止在标头中添加异常堆栈跟踪,请使用:excludeHeaders()Spring中文文档

DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.excludeHeaders(HeaderNames.HeadersToAdd.EX_STACKTRACE);

此外,您可以通过添加 ;这还会禁用所有标准异常标头。ExceptionHeadersCreatorSpring中文文档

DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.setExceptionHeadersCreator((kafkaHeaders, exception, isKey, headerNames) -> {
    kafkaHeaders.add(new RecordHeader(..., ...));
});

同样从版本 2.8.4 开始,您现在可以通过该方法提供多个标头函数。 这允许应用其他函数,即使已经注册了另一个函数,例如,在使用非阻塞重试时也是如此。addHeadersFunctionSpring中文文档

ExponentialBackOffWithMaxRetries实现

Spring Framework 提供了许多实现。 默认情况下,将无限期重试;要在重试一定次数后放弃,需要计算 . 从 2.7.3 版本开始,Spring for Apache Kafka 提供了 which 是一个子类,它接收属性并自动计算 ,这稍微方便一些。BackOffExponentialBackOffmaxElapsedTimeExponentialBackOffWithMaxRetriesmaxRetriesmaxElapsedTimeSpring中文文档

@Bean
DefaultErrorHandler handler() {
    ExponentialBackOffWithMaxRetries bo = new ExponentialBackOffWithMaxRetries(6);
    bo.setInitialInterval(1_000L);
    bo.setMultiplier(2.0);
    bo.setMaxInterval(10_000L);
    return new DefaultErrorHandler(myRecoverer, bo);
}

这将在几秒钟后重试,然后再调用恢复程序。1, 2, 4, 8, 10, 10Spring中文文档