处理异常
本节介绍如何处理在使用 Spring for Apache Kafka 时可能出现的各种异常。
侦听器错误处理程序
从版本 2.0 开始,注解有一个新属性:.@KafkaListener
errorHandler
您可以使用 来提供实现的 bean 名称。
这个功能接口有一个方法,如下面的清单所示:errorHandler
KafkaListenerErrorHandler
@FunctionalInterface
public interface KafkaListenerErrorHandler {
Object handleError(Message<?> message, ListenerExecutionFailedException exception) throws Exception;
}
您可以访问消息转换器生成的 spring-messaging 对象和侦听器抛出的异常,该异常包装在 .
错误处理程序可以引发原始异常或新异常,该异常将引发到容器中。
错误处理程序返回的任何内容都将被忽略。Message<?>
ListenerExecutionFailedException
从版本 2.7 开始,您可以在 和 上设置属性,这会导致将 raw 添加到 headers 中的 converted。
这很有用,例如,如果您希望在 listener 错误处理程序中使用 。
它可能用于请求/回复方案,即您希望在一定次数的重试后,在死信主题中捕获失败的记录后,将失败结果发送给发件人。rawRecordHeader
MessagingMessageConverter
BatchMessagingMessageConverter
ConsumerRecord
Message<?>
KafkaHeaders.RAW_DATA
DeadLetterPublishingRecoverer
@Bean
public KafkaListenerErrorHandler eh(DeadLetterPublishingRecoverer recoverer) {
return (msg, ex) -> {
if (msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class) > 9) {
recoverer.accept(msg.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class), ex);
return "FAILED";
}
throw ex;
};
}
它有一个子接口 (),可以通过以下方法访问 consumer 对象:ConsumerAwareListenerErrorHandler
Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer);
另一个子接口 () 在使用手动 s 时提供对对象的访问。ManualAckListenerErrorHandler
Acknowledgment
AckMode
Object handleError(Message<?> message, ListenerExecutionFailedException exception,
Consumer<?, ?> consumer, @Nullable Acknowledgment ack);
在任何一种情况下,您都不应该对使用者执行任何查找,因为容器不会知道它们。
容器错误处理程序
从版本 2.8 开始,旧版和接口已被新的 .
这些错误处理程序可以处理记录侦听器和批处理侦听器的错误,从而允许单个侦听器容器工厂为这两种类型的侦听器创建容器。 提供了替换大多数旧版框架错误处理程序实现的实现。ErrorHandler
BatchErrorHandler
CommonErrorHandler
CommonErrorHandler
有关将自定义错误处理程序迁移到 的信息,请参阅将自定义旧版错误处理程序实现迁移到 CommonErrorHandler
。CommonErrorHandler
使用事务时,默认情况下不会配置错误处理程序,以便异常将回滚事务。
事务容器的错误处理由 AfterRollbackProcessor
处理。
如果您在使用事务时提供自定义错误处理程序,并且您希望回滚事务,它必须引发异常。
此接口有一个 default 方法,容器调用该方法来确定如果错误处理程序返回而不引发异常,是否应提交偏移量;默认情况下,它返回 true。isAckAfterHandle()
通常,当错误未被“处理”时(例如,在执行 seek 操作之后),框架提供的错误处理程序将引发异常。
默认情况下,此类异常由容器记录在 level .
所有框架错误处理程序都进行了扩展,这允许您控制记录这些异常的级别。ERROR
KafkaExceptionLogLevelAware
/**
* Set the level at which the exception thrown by this handler is logged.
* @param logLevel the level (default ERROR).
*/
public void setLogLevel(KafkaException.Level logLevel) {
...
}
您可以指定要用于容器工厂中所有侦听器的全局错误处理程序。 以下示例显示了如何执行此操作:
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.setCommonErrorHandler(myErrorHandler);
...
return factory;
}
默认情况下,如果带注释的侦听器方法引发异常,则会将其抛出到容器中,并根据容器配置处理消息。
容器在调用错误处理程序之前提交任何待处理的偏移量提交。
如果您使用的是 Spring Boot,则只需将错误处理程序添加为 a,Boot 就会将其添加到自动配置的工厂中。@Bean
退避处理程序
错误处理程序(如 DefaultErrorHandler )使用 a 来确定在重试投放之前要等待的时间。
从版本 2.9 开始,您可以配置自定义 .
默认处理程序只是暂停线程,直到 back off 时间过去(或容器停止)。
该框架还提供了暂停侦听器容器,直到回退时间过去,然后恢复容器。
当延迟时间长于消费者属性时,这很有用。
请注意,实际回退时间的分辨率将受 container 属性的影响。BackOff
BackOffHandler
ContainerPausingBackOffHandler
max.poll.interval.ms
pollTimeout
DefaultErrorHandler
这个新的错误处理程序取代了 和 ,它们现在是多个版本的默认错误处理程序。
一个区别是,批处理侦听器的回退行为(当引发除 a 以外的异常时)等效于重试完整批处理。SeekToCurrentErrorHandler
RecoveringBatchErrorHandler
BatchListenerFailedException
从版本 2.9 开始,可以配置为提供与查找未处理的记录偏移量相同的语义,如下所述,但实际上不需要查找。
相反,记录由侦听器容器保留,并在错误处理程序退出后(以及在执行单个 paused 之后,以保持使用者处于活动状态;如果正在使用非阻塞重试或 a,则暂停可能会扩展到多个轮询)。
错误处理程序将结果返回到容器,指示是否可以重新提交当前失败的记录,或者它是否已恢复,然后不会再次将其发送到侦听器。
要启用此模式,请将属性设置为 。DefaultErrorHandler poll() ContainerPausingBackOffHandler seekAfterError false |
错误处理程序可以恢复 (跳过) 不断失败的记录。
默认情况下,在 10 次失败后,将记录失败的记录(在 级别)。
您可以使用自定义 recoverer () 和 a 来配置处理程序,该 () 和控制每个之间的交付尝试和延迟。
使用 with 会导致(有效地)无限次重试。
以下示例配置三次尝试后的恢复:ERROR
BiConsumer
BackOff
FixedBackOff
FixedBackOff.UNLIMITED_ATTEMPTS
DefaultErrorHandler errorHandler =
new DefaultErrorHandler((record, exception) -> {
// recover after 3 failures, with no back off - e.g. send to a dead-letter topic
}, new FixedBackOff(0L, 2L));
要使用此处理程序的自定义实例配置侦听器容器,请将其添加到容器工厂。
例如,对于容器工厂,您可以按如下方式添加:@KafkaListener
DefaultErrorHandler
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AckMode.RECORD);
factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000L, 2L)));
return factory;
}
对于记录侦听器,这将重试投放最多 2 次(3 次投放尝试),后退 1 秒,而不是默认配置 ()。
在重试次数用尽后,只会记录失败。FixedBackOff(0L, 9)
例如,如果 返回 6 条记录(每个分区 0、1、2 各 2 条),并且侦听器在第 4 条记录上引发异常,则容器会通过提交前三条消息的偏移量来确认前三条消息。
寻求分区 1 的偏移量 1 和分区 2 的偏移量 0。
next 返回 3 个未处理的记录。poll
DefaultErrorHandler
poll()
如果是 ,则容器在调用错误处理程序之前提交前两个分区的偏移量。AckMode
BATCH
对于批处理侦听器,侦听器必须引发一个,指示批处理中的哪些记录失败。BatchListenerFailedException
事件的顺序是:
-
在索引之前提交记录的偏移量。
-
如果重试次数未用尽,则执行 seek 操作,以便重新传送所有剩余记录(包括失败的记录)。
-
如果重试次数已用尽,请尝试恢复失败的记录(仅限默认日志)并执行查找,以便重新传递剩余记录(不包括失败的记录)。 已提交已恢复记录的偏移量。
-
如果重试已用尽且恢复失败,则执行查找,就像重试未用尽一样。
从版本 2.9 开始,可以配置为提供与上面讨论的查找未处理的记录偏移量相同的语义,但实际上不需要查找。
相反,错误处理程序会创建一个新的,仅包含未处理的记录,这些记录随后将提交给侦听器(在执行单个 paused 之后,以保持使用者处于活动状态)。
要启用此模式,请将属性设置为 。DefaultErrorHandler ConsumerRecords<?, ?> poll() seekAfterError false |
默认 recoverer 会在重试次数用尽后记录失败的记录。
您可以使用自定义恢复器,也可以使用框架提供的恢复器,例如 DeadLetterPublishingRecoverer
。
当使用 POJO 批处理侦听器(例如 )并且您没有完整的使用者记录要添加到异常时,您只需添加失败记录的索引:List<Thing>
@KafkaListener(id = "recovering", topics = "someTopic")
public void listen(List<Thing> things) {
for (int i = 0; i < things.size(); i++) {
try {
process(things.get(i));
}
catch (Exception e) {
throw new BatchListenerFailedException("Failed to process", i);
}
}
}
当容器配置了 时,可以将错误处理程序配置为提交已恢复记录的偏移量;将属性设置为 。AckMode.MANUAL_IMMEDIATE
commitRecovered
true
另请参阅发布死信记录。
使用事务时,类似的功能由 .
请参阅 After-rollback Processor (回滚后处理器)。DefaultAfterRollbackProcessor
这会将某些异常视为致命异常,并且会跳过此类异常的重试;在第一次失败时调用 recoverer。
默认情况下,被视为致命的异常包括:DefaultErrorHandler
-
DeserializationException
-
MessageConversionException
-
ConversionException
-
MethodArgumentResolutionException
-
NoSuchMethodException
-
ClassCastException
,因为这些异常不太可能在重试的投放中得到解决。
您可以向 not-retryable 类别添加更多异常类型,或完全替换分类异常的映射。
有关更多信息,请参阅 Javadocs 和 以及 .DefaultErrorHandler.addNotRetryableException()
DefaultErrorHandler.setClassifications()
spring-retry
BinaryExceptionClassifier
下面是一个添加到不可重试异常的示例:IllegalArgumentException
@Bean
public DefaultErrorHandler errorHandler(ConsumerRecordRecoverer recoverer) {
DefaultErrorHandler handler = new DefaultErrorHandler(recoverer);
handler.addNotRetryableExceptions(IllegalArgumentException.class);
return handler;
}
错误处理程序可以配置一个或多个 s,用于接收重试和恢复进度的通知。
从版本 2.8.10 开始,添加了批处理侦听器的方法。RetryListener
@FunctionalInterface
public interface RetryListener {
void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);
default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
}
default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
}
default void failedDelivery(ConsumerRecords<?, ?> records, Exception ex, int deliveryAttempt) {
}
default void recovered(ConsumerRecords<?, ?> records, Exception ex) {
}
default void recoveryFailed(ConsumerRecords<?, ?> records, Exception original, Exception failure) {
}
}
有关更多信息,请参阅 JavaDocs。
如果 recoverer 失败(引发异常),则失败的记录将包含在 seek 中。
如果 recoverer 失败,则默认情况下将重置 ,并且重新交付将在再次尝试恢复之前再次通过回退。
要在恢复失败后跳过重试,请将错误处理程序的 设置为 。BackOff resetStateOnRecoveryFailure false |
您可以向错误处理程序提供 a ,以根据失败的记录和/或异常来确定要使用的:BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>
BackOff
handler.setBackOffFunction((record, ex) -> { ... });
如果函数返回 ,则将使用处理程序的默认值。null
BackOff
设置为 ,如果异常类型在两次失败之间发生更改,则重试序列将重新启动(包括选择新的 ,如果已配置)。
When (版本 2.9 之前的默认值),不考虑异常类型。resetStateOnExceptionChange
true
BackOff
false
从版本 2.9 开始,现在是默认的。true
另请参阅 Delivery Attempts 标头。
使用批处理错误处理程序的转换错误
从版本 2.8 开始,当将 a 与 、 a 或 a 以及 一起使用时,批处理侦听器现在可以正确处理转换错误。
发生转换错误时,有效负载将设置为 null,并向记录标头添加反序列化异常,类似于 .
侦听器中提供了 s 的列表,因此侦听器可以抛出 a,指示发生转换异常的第一个索引。MessageConverter
ByteArrayDeserializer
BytesDeserializer
StringDeserializer
DefaultErrorHandler
ErrorHandlingDeserializer
ConversionException
BatchListenerFailedException
例:
@KafkaListener(id = "test", topics = "topic")
void listen(List<Thing> in, @Header(KafkaHeaders.CONVERSION_FAILURES) List<ConversionException> exceptions) {
for (int i = 0; i < in.size(); i++) {
Foo foo = in.get(i);
if (foo == null && exceptions.get(i) != null) {
throw new BatchListenerFailedException("Conversion error", exceptions.get(i), i);
}
process(foo);
}
}
重试完整批处理
现在是批处理侦听器的回退行为,其中侦听器会引发除 .DefaultErrorHandler
BatchListenerFailedException
无法保证在重新交付批次时,该批次具有相同的记录数和/或重新交付的记录具有相同的顺序。
因此,不可能轻松维护批处理的重试状态。
采用以下方法。
如果批处理侦听器引发的异常不是 a ,则从内存中的记录批处理中执行重试。
为了避免在延长的重试序列期间发生重新平衡,错误处理程序会暂停使用者,在每次重试时在休眠前轮询使用者以进行回退,然后再次调用侦听器。
如果/当重试用尽时,将为批处理中的每条记录调用 the。
如果 recoverer 抛出异常,或者线程在休眠期间中断,则该批记录将在下一次轮询时重新传递。
在退出之前,无论结果如何,使用者都会恢复。FallbackBatchErrorHandler
BatchListenerFailedException
ConsumerRecordRecoverer
此机制不能用于事务。 |
在等待间隔时,错误处理程序将以短暂的睡眠循环,直到达到所需的延迟,同时检查容器是否已停止,允许睡眠在停止后很快退出,而不是导致延迟。BackOff
stop()
容器停止错误处理程序
如果侦听器引发异常,则停止容器。
对于记录侦听器,当 is 时,将提交已处理记录的偏移量。
对于记录侦听器,当 为 任何手动值时,将提交已确认记录的偏移量。
对于记录侦听器,当 is 或对于批处理侦听器时,将在重新启动容器时重放整个批处理。CommonContainerStoppingErrorHandler
AckMode
RECORD
AckMode
AckMode
BATCH
容器停止后,将引发包装 the 的异常。
这是为了使事务回滚(如果启用了事务)。ListenerExecutionFailedException
委派错误处理程序
可以根据异常类型委托给不同的错误处理程序。
例如,您可能希望为大多数异常调用 a,或为其他异常调用 a。CommonDelegatingErrorHandler
DefaultErrorHandler
CommonContainerStoppingErrorHandler
所有委托必须共享相同的兼容属性 (, ...)。ackAfterHandle
seekAfterError
日志记录错误处理程序
它只是记录异常;使用 Record 侦听器时,上一次轮询的剩余记录将传递给侦听器。
对于批处理侦听器,将记录批处理中的所有记录。CommonLoggingErrorHandler
对 Record 和 Batch 侦听器使用不同的常见错误处理程序
如果您希望对记录和批处理侦听器使用不同的错误处理策略,则提供了允许为每个侦听器类型配置特定错误处理程序的 。CommonMixedErrorHandler
常见错误处理程序摘要
-
DefaultErrorHandler
-
CommonContainerStoppingErrorHandler
-
CommonDelegatingErrorHandler
-
CommonLoggingErrorHandler
-
CommonMixedErrorHandler
旧版错误处理程序及其替换
旧版错误处理程序 | 更换 |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
无替换,与无限 . |
|
|
|
没有替换,使用并引发除 . |
将自定义旧版错误处理程序实现迁移到CommonErrorHandler
请参阅 中的 JavaDocs。CommonErrorHandler
要替换 or 实现,您应该 implement 并保留 return (default)。
您还应该实现以处理发生在记录处理范围之外的异常(例如,消费者错误)。ErrorHandler
ConsumerAwareErrorHandler
handleOne()
seeksAfterHandle()
false
handleOtherException()
要替换 implementation,您应该 implement 和 override to return (错误处理程序必须执行必要的 seek)。
您还应该实现 - 以处理发生在记录处理范围之外的异常(例如消费者错误)。RemainingRecordsErrorHandler
handleRemaining()
seeksAfterHandle()
true
handleOtherException()
要替换任何 implementation,您应该 implement You also should implement - 以处理发生在记录处理范围之外的异常(例如消费者错误)。BatchErrorHandler
handleBatch()
handleOtherException()
回滚处理器之后
使用事务时,如果侦听器引发异常(并且错误处理程序(如果存在)引发异常),则事务将回滚。
默认情况下,任何未处理的记录(包括失败的记录)都会在下次轮询时重新获取。
这是通过在 .
使用批处理侦听器,将重新处理整个记录批次(容器不知道批处理中的哪条记录失败)。
要修改此行为,您可以使用自定义 .
例如,对于基于记录的侦听器,您可能希望跟踪失败的记录,并在尝试一定次数后放弃,也许是通过将其发布到死信主题。seek
DefaultAfterRollbackProcessor
AfterRollbackProcessor
从版本 2.2 开始,现在可以恢复(跳过)不断失败的记录。
默认情况下,在 10 次失败后,将记录失败的记录(在 级别)。
您可以使用自定义 recoverer () 和最大故障数来配置处理器。
将属性设置为负数会导致无限次重试。
以下示例配置三次尝试后的恢复:DefaultAfterRollbackProcessor
ERROR
BiConsumer
maxFailures
AfterRollbackProcessor<String, String> processor =
new DefaultAfterRollbackProcessor((record, exception) -> {
// recover after 3 failures, with no back off - e.g. send to a dead-letter topic
}, new FixedBackOff(0L, 2L));
当您不使用事务时,可以通过配置 .
请参阅 容器错误处理程序。DefaultErrorHandler
从版本 3.2 开始,Recovery 现在可以恢复(跳过)一直失败的整批记录。
设置为启用此功能。ContainerProperties.setBatchRecoverAfterRollback(true)
默认行为,批处理侦听器无法进行恢复,因为框架不知道批处理中的哪些记录不断失败。 在这种情况下,应用程序侦听器必须处理不断失败的记录。 |
另请参阅发布死信记录。
从版本 2.2.5 开始,可以在新事务中调用(在失败的事务回滚后启动)。
然后,如果您使用 to publish a failed record,则处理器会将原始 topic/partition 中恢复的记录的偏移量发送到事务。
要启用此功能,请在 .DefaultAfterRollbackProcessor
DeadLetterPublishingRecoverer
commitRecovered
kafkaTemplate
DefaultAfterRollbackProcessor
如果 recoverer 失败(引发异常),则失败的记录将包含在 seek 中。
从版本 2.5.5 开始,如果 recoverer 失败,则默认情况下将重置 ,并且重新交付将再次通过回退,然后再尝试恢复。
在早期版本中,不会重置 ,并在下次失败时重新尝试恢复。
要恢复到以前的行为,请将处理器的 属性设置为 。BackOff BackOff resetStateOnRecoveryFailure false |
从版本 2.6 开始,您现在可以为处理器提供一个 ,以根据失败的记录和/或异常来确定要使用的:BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>
BackOff
handler.setBackOffFunction((record, ex) -> { ... });
如果函数返回 ,则将使用处理器的默认值。null
BackOff
从版本 2.6.3 开始,如果异常类型在失败之间发生变化,则设置为 并且重试序列将重新启动(包括选择新的,如果已配置)。
默认情况下,不考虑异常类型。resetStateOnExceptionChange
true
BackOff
从版本 2.3.1 开始,类似于 ,将某些异常视为致命的,并且对于此类异常,将跳过重试;在第一次失败时调用 recoverer。
默认情况下,被视为致命的异常包括:DefaultErrorHandler
DefaultAfterRollbackProcessor
-
DeserializationException
-
MessageConversionException
-
ConversionException
-
MethodArgumentResolutionException
-
NoSuchMethodException
-
ClassCastException
,因为这些异常不太可能在重试的投放中得到解决。
您可以向 not-retryable 类别添加更多异常类型,或完全替换分类异常的映射。
有关更多信息,请参阅 Javadocs,以及 .DefaultAfterRollbackProcessor.setClassifications()
spring-retry
BinaryExceptionClassifier
下面是一个添加到不可重试异常的示例:IllegalArgumentException
@Bean
public DefaultAfterRollbackProcessor errorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer) {
DefaultAfterRollbackProcessor processor = new DefaultAfterRollbackProcessor(recoverer);
processor.addNotRetryableException(IllegalArgumentException.class);
return processor;
}
另请参阅 Delivery Attempts 标头。
使用 current 时,容器无法检测 a 是由再平衡引起的,还是由于超时或到期而撤销了生产者。
因为,在大多数情况下,这是由再平衡引起的,所以容器不会调用 (因为寻找分区是不合适的,因为我们不再被分配它们)。
如果您确保超时足够大以处理每个事务并定期执行“空”事务(例如通过 a ),则可以避免由于超时和到期而导致的隔离。
或者,您可以将 container 属性设置为,容器将停止,从而避免丢失记录。
您可以使用 a 并检查 属性 来检测此情况。
由于该事件还引用了容器,因此您可以使用此事件重新启动容器。kafka-clients ProducerFencedException transactional.id AfterRollbackProcessor ListenerContainerIdleEvent stopContainerWhenFenced true ConsumerStoppedEvent Reason FENCED |
从版本 2.7 开始,在等待间隔时,错误处理程序将以短暂的睡眠循环,直到达到所需的延迟,同时检查容器是否已停止,允许睡眠在之后很快退出,而不是导致延迟。BackOff
stop()
从版本 2.7 开始,处理器可以配置一个或多个 s,以接收重试和恢复进度的通知。RetryListener
@FunctionalInterface
public interface RetryListener {
void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);
default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
}
default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
}
}
有关更多信息,请参阅 JavaDocs。
Delivery Attempts 标头
以下内容仅适用于记录侦听器,不适用于批处理侦听器。
从版本 2.5 开始,当使用 或 implements 时,可以启用向记录添加 header () 。
此标头的值是从 1 开始的递增整数。
当接收 raw 时,整数位于 .ErrorHandler
AfterRollbackProcessor
DeliveryAttemptAware
KafkaHeaders.DELIVERY_ATTEMPT
kafka_deliveryAttempt
ConsumerRecord<?, ?>
byte[4]
int delivery = ByteBuffer.wrap(record.headers()
.lastHeader(KafkaHeaders.DELIVERY_ATTEMPT).value())
.getInt();
与 or 一起使用时,可以通过将它作为参数添加到 listener 方法中来获取。@KafkaListener
DefaultKafkaHeaderMapper
SimpleKafkaHeaderMapper
@Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery
要启用此标头的填充,请将 container 属性设置为 。
默认情况下,它是禁用的,以避免查找每条记录的状态并添加标头的 (小) 开销。deliveryAttemptHeader
true
的 和 支持此功能。DefaultErrorHandler
DefaultAfterRollbackProcessor
批量侦听器的 Delivery Attempts Header
使用 进行处理时,标头的显示方式可能与 不同。ConsumerRecord
BatchListener
KafkaHeaders.DELIVERY_ATTEMPT
SingleRecordListener
从版本 3.3 开始,如果要在使用 时将标头注入 ,请在 中将 设置为 。KafkaHeaders.DELIVERY_ATTEMPT
ConsumerRecord
BatchListener
DeliveryAttemptAwareRetryListener
RetryListener
ErrorHandler
请参考下面的代码。
final FixedBackOff fixedBackOff = new FixedBackOff(1, 10);
final DefaultErrorHandler errorHandler = new DefaultErrorHandler(fixedBackOff);
errorHandler.setRetryListeners(new DeliveryAttemptAwareRetryListener());
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setCommonErrorHandler(errorHandler);
然后,每当批处理无法完成时,都会将标头注入到 .DeliveryAttemptAwareRetryListener
KafkaHeaders.DELIVERY_ATTMPT
ConsumerRecord
侦听器信息报头
在某些情况下,能够知道侦听器在哪个容器中运行非常有用。
从版本 2.8.4 开始,您现在可以在侦听器容器上设置属性,或在 Comments 上设置属性。
然后,容器会将 this 添加到所有传入消息的标头中;然后,它可以用于 Record interceptor、filters 等,或者用于侦听器本身。listenerInfo
info
@KafkaListener
KafkaListener.LISTENER_INFO
@KafkaListener(id = "something", topics = "topic", filter = "someFilter",
info = "this is the something listener")
public void listen(@Payload Thing thing,
@Header(KafkaHeaders.LISTENER_INFO) String listenerInfo) {
...
}
在 or 实现中使用时,标头在使用者记录中作为字节数组,使用 's 属性进行转换。RecordInterceptor
RecordFilterStrategy
KafkaListenerAnnotationBeanPostProcessor
charSet
标头映射器也会在从使用者记录创建时转换为,并且永远不会将此标头映射到出站记录上。String
MessageHeaders
对于 POJO 批处理侦听器,从版本 2.8.6 开始,标头将复制到批处理的每个成员中,并且在转换后也可用作单个参数。String
@KafkaListener(id = "list2", topics = "someTopic", containerFactory = "batchFactory",
info = "info for batch")
public void listen(List<Thing> list,
@Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets,
@Header(KafkaHeaders.LISTENER_INFO) String info) {
...
}
如果批处理侦听器具有过滤器,并且过滤器导致空批次,则需要添加到参数中,因为该信息不适用于空批次。required = false @Header |
如果您收到信息,则位于每个 .List<Message<Thing>>
KafkaHeaders.LISTENER_INFO
Message<?>
有关使用批处理的更多信息,请参阅批处理侦听器。
发布死信记录
当记录达到最大失败数时,您可以使用 record recoverer 配置 和 。
框架提供了 ,它将失败的消息发布到另一个主题。
recoverer 需要一个 ,用于发送记录。
您还可以(可选)使用 对其进行配置,调用该 . 以解析目标主题和分区。DefaultErrorHandler
DefaultAfterRollbackProcessor
DeadLetterPublishingRecoverer
KafkaTemplate<Object, Object>
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>
默认情况下,死信记录将发送到名为 (原始主题名称后缀为 ) 的主题,以及与原始记录相同的分区。
因此,当您使用默认解析程序时,死信主题必须至少具有与原始主题一样多的分区。<originalTopic>-dlt -dlt
|
如果返回的 具有负 partition,则 中未设置该分区,因此 Kafka 会选择该分区。
从版本 2.2.4 开始,任何(例如,在方法中检测到异常时抛出)都使用属性进行了增强。
这允许目标解析程序使用此 ,以及 to select the dead letter 主题中的信息。TopicPartition
ProducerRecord
ListenerExecutionFailedException
@KafkaListener
groupId
ConsumerRecord
以下示例显示如何连接自定义目标解析程序:
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
(r, e) -> {
if (e instanceof FooException) {
return new TopicPartition(r.topic() + ".Foo.failures", r.partition());
}
else {
return new TopicPartition(r.topic() + ".other.failures", r.partition());
}
});
CommonErrorHandler errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(0L, 2L));
发送到死信主题的记录通过以下标头进行增强:
-
KafkaHeaders.DLT_EXCEPTION_FQCN
:Exception 类名称(通常为 ,但也可以是其他)。ListenerExecutionFailedException
-
KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN
:异常原因类名(如果存在)(自版本 2.8 起)。 -
KafkaHeaders.DLT_EXCEPTION_STACKTRACE
:异常堆栈跟踪。 -
KafkaHeaders.DLT_EXCEPTION_MESSAGE
:异常消息。 -
KafkaHeaders.DLT_KEY_EXCEPTION_FQCN
:异常类名(仅限键反序列化错误)。 -
KafkaHeaders.DLT_KEY_EXCEPTION_STACKTRACE
:异常堆栈跟踪(仅限键反序列化错误)。 -
KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE
:异常消息(仅限键反序列化错误)。 -
KafkaHeaders.DLT_ORIGINAL_TOPIC
:原始主题。 -
KafkaHeaders.DLT_ORIGINAL_PARTITION
:原始分区。 -
KafkaHeaders.DLT_ORIGINAL_OFFSET
:原始偏移量。 -
KafkaHeaders.DLT_ORIGINAL_TIMESTAMP
:原始时间戳。 -
KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE
:原始时间戳类型。 -
KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP
:处理记录失败的原始消费者组(自 2.8 版本起)。
键异常仅由 s 引起,因此没有 。DeserializationException
DLT_KEY_EXCEPTION_CAUSE_FQCN
有两种机制可以添加更多标头。
-
子类化 recoverer 并覆盖 - 调用并添加更多 headers。
createProducerRecord()
super.createProducerRecord()
-
提供 a 以接收 consumer 记录和异常,返回一个对象;从那里的 Headers 将被复制到最终的 producer 记录;另请参阅 管理死信记录标头。 用于设置 .
BiFunction
Headers
setHeadersFunction()
BiFunction
第二个版本更易于实现,但第一个版本具有更多信息,包括已组装的标准标头。
从版本 2.3 开始,当与 一起使用时,发布者会将死信 producer 记录中的 record 恢复为无法反序列化的原始值。
以前,为 null,用户代码必须从消息标头中解码 the。
此外,您还可以向发布者提供多个 s;例如,如果要发布 from a 以及使用与成功反序列化的记录不同的序列化程序的值,则可能需要这样做。
以下是使用使用 和 序列化程序的 s 配置发布者的示例:ErrorHandlingDeserializer
value()
value()
DeserializationException
KafkaTemplate
byte[]
DeserializationException
KafkaTemplate
String
byte[]
@Bean
public DeadLetterPublishingRecoverer publisher(KafkaTemplate<?, ?> stringTemplate,
KafkaTemplate<?, ?> bytesTemplate) {
Map<Class<?>, KafkaTemplate<?, ?>> templates = new LinkedHashMap<>();
templates.put(String.class, stringTemplate);
templates.put(byte[].class, bytesTemplate);
return new DeadLetterPublishingRecoverer(templates);
}
发布者使用映射键来查找适合即将发布的模板。
建议使用 A,以便按顺序检查键。value()
LinkedHashMap
当发布 values 并且有多个 templates 时,recoverer 将为 class 寻找一个 template;如果不存在,则将使用 中的第一个模板。null
Void
values().iterator()
从 2.7 开始,您可以使用该方法,以便在消息发布失败时引发异常。
您还可以使用 设置超时,以验证发件人是否成功。setFailIfSendResultIsError
setWaitForSendResultTimeout
如果 recoverer 失败(引发异常),则失败的记录将包含在 seek 中。
从版本 2.5.5 开始,如果 recoverer 失败,则默认情况下将重置 ,并且重新交付将再次通过回退,然后再尝试恢复。
在早期版本中,不会重置 ,并在下次失败时重新尝试恢复。
若要还原到以前的行为,请将错误处理程序的属性设置为 。BackOff BackOff resetStateOnRecoveryFailure false |
从版本 2.6.3 开始,如果异常类型在失败之间发生变化,则设置为 并且重试序列将重新启动(包括选择新的,如果已配置)。
默认情况下,不考虑异常类型。resetStateOnExceptionChange
true
BackOff
从版本 2.3 开始,recoverer 也可以与 Kafka Streams 一起使用 - 有关更多信息,请参阅从反序列化异常中恢复。
这会在 headers 和 (使用 Java 序列化) 中添加反序列化异常。
默认情况下,这些标头不会保留在发布到死信主题的邮件中。
从版本 2.7 开始,如果 key 和 value 都失败了反序列化,则两者的原始值都会填充到发送到 DLT 的记录中。ErrorHandlingDeserializer
ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER
ErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER
如果传入的记录彼此依赖,但可能无序到达,那么将失败的记录重新发布到原始主题的尾部(多次)可能很有用,而不是将其直接发送到死信主题。 有关示例,请参阅此 Stack Overflow 问题。
以下错误处理程序配置将完全执行此操作:
@Bean
public ErrorHandler eh(KafkaOperations<String, String> template) {
return new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template,
(rec, ex) -> {
org.apache.kafka.common.header.Header retries = rec.headers().lastHeader("retries");
if (retries == null) {
retries = new RecordHeader("retries", new byte[] { 1 });
rec.headers().add(retries);
}
else {
retries.value()[0]++;
}
return retries.value()[0] > 5
? new TopicPartition("topic.DLT", rec.partition())
: new TopicPartition("topic", rec.partition());
}), new FixedBackOff(0L, 0L));
}
从版本 2.7 开始,recoverer 检查目标解析器选择的分区是否确实存在。
如果分区不存在,则 中的分区将设置为 ,允许 选择分区。
您可以通过将属性设置为 来禁用此检查。ProducerRecord
null
KafkaProducer
verifyPartition
false
从版本 3.1 开始,将属性设置为 将记录恢复记录和异常。logRecoveryRecord
true
管理死信记录标头
-
appendOriginalHeaders
(默认true
) -
stripPreviousExceptionHeaders
(自 2.8 版本起默认)true
Apache Kafka 支持多个同名的标头;要获取 “latest” 值,可以使用 ;要获取多个标头的迭代器,请使用 .headers.lastHeader(headerName)
headers.headers(headerName).iterator()
重复重新发布失败的记录时,这些标头可能会增长(并最终导致发布失败,因为 a );对于异常标头尤其如此,尤其是对于 Stack Trace 标头。RecordTooLargeException
使用这两个属性的原因是,虽然您可能希望只保留最后一个异常信息,但您可能希望保留记录在每次失败时传递的主题的历史记录。
appendOriginalHeaders
应用于所有名为 的标题,而应用于所有名为 的标题。ORIGINAL
stripPreviousExceptionHeaders
EXCEPTION
从版本 2.8.4 开始,您现在可以控制将哪些标准标头添加到输出记录中。
有关默认添加的(当前)10 个标准 Headers 的通用名称,请参见 (这些不是实际的 Headers 名称,只是一个抽象;实际的 Headers 名称由子类可以覆盖的方法设置)。enum HeadersToAdd
getHeaderNames()
要排除标头,请使用 method;例如,要禁止在标头中添加异常堆栈跟踪,请使用:excludeHeaders()
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.excludeHeaders(HeaderNames.HeadersToAdd.EX_STACKTRACE);
此外,您还可以通过添加 ;这也会禁用所有标准异常标头。ExceptionHeadersCreator
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.setExceptionHeadersCreator((kafkaHeaders, exception, isKey, headerNames) -> {
kafkaHeaders.add(new RecordHeader(..., ...));
});
同样从版本 2.8.4 开始,您现在可以通过 method.
这允许应用其他函数,即使已经注册了另一个函数,例如,在使用非阻塞重试时。addHeadersFunction
另请参阅使用非阻塞重试的 Failure Header Management。
ExponentialBackOffWithMaxRetries
实现
Spring Framework 提供了许多实现。
默认情况下,将无限期重试;要在重试尝试一定次数后放弃,需要计算 .
从版本 2.7.3 开始, Spring for Apache Kafka 提供了 which 是一个子类,它接收属性并自动计算 ,这更方便一些。BackOff
ExponentialBackOff
maxElapsedTime
ExponentialBackOffWithMaxRetries
maxRetries
maxElapsedTime
@Bean
DefaultErrorHandler handler() {
ExponentialBackOffWithMaxRetries bo = new ExponentialBackOffWithMaxRetries(6);
bo.setInitialInterval(1_000L);
bo.setMultiplier(2.0);
bo.setMaxInterval(10_000L);
return new DefaultErrorHandler(myRecoverer, bo);
}
这将在几秒钟后重试,然后再调用 recoverer。1, 2, 4, 8, 10, 10