此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 spring-cloud-stream 4.1.4! |
错误处理
在本节中,我们将解释框架提供的错误处理机制背后的一般概念。 我们将使用 Rabbit Binder 作为示例,因为单个 Binder 定义了不同的集合 特定于底层代理功能的某些受支持机制(例如 Kafka Binder)的属性。
错误会发生,Spring Cloud Stream 提供了几种灵活的机制来处理它们。请注意,这些技术依赖于 Binder 实现和 功能以及编程模型(稍后将详细介绍)。
每当 Message 处理程序(函数)抛出异常时,它就会传播回 Binder,此时 Binder 将多次尝试重试
相同的消息(默认为 3)使用 Spring Retry 库提供。
如果重试不成功,则由错误处理机制决定,该机制可能会丢弃消息、将消息重新排队以重新处理或将失败的消息发送到 DLQ。RetryTemplate
Rabbit 和 Kafka 都支持这些概念(尤其是 DLQ)。但是,其他活页夹可能不支持,因此请参阅您的个人活页夹文档以了解有关支持的详细信息 error-handling 选项。
但请记住,反应式函数不符合 Message 处理程序的条件,因为它不处理单个消息和 相反,提供了一种将框架提供的 stream(即 Flux)与用户提供的 stream ((即 Flux) )连接起来的方法。为什么这很重要?这是因为您在本节后面阅读的有关重试模板的任何内容、删除失败的消息、重试、 DLQ 和协助完成所有这些操作的配置属性仅适用于 Message 处理程序(即命令式函数)。
响应式 API 提供了一个非常丰富的库,其中包含自己的运算符和机制,以帮助你处理特定于
各种反应式用例,这些用例比简单的 Message 处理程序用例要复杂得多,因此请使用它们,例如
您可以在 中找到。public final Flux<T> retryWhen(Retry retrySpec);
reactor.core.publisher.Flux
@Bean
public Function<Flux<String>, Flux<String>> uppercase() {
return flux -> flux
.retryWhen(Retry.backoff(3, Duration.ofMillis(1000)))
.map(v -> v.toUpperCase());
}
丢弃失败的消息
默认情况下,系统提供错误处理程序。第一个错误处理程序将只记录错误消息。第二个错误处理程序是特定于 Binder 的错误处理程序 它负责在特定消息传递系统的上下文中处理错误消息(例如,发送到 DLQ)。但是由于没有提供额外的错误处理配置(在当前情况下),此处理程序不会执行任何操作。因此,基本上在记录后,消息将被丢弃。
虽然在某些情况下可以接受,但在大多数情况下,这是不可接受的,我们需要一些恢复机制来避免消息丢失。
处理错误消息
在上一节中,我们提到了默认情况下,导致错误的消息会被有效地记录和丢弃。该框架还为您公开了 mechanism
提供自定义错误处理程序(即发送通知或写入数据库等)。为此,您可以添加专门设计的 that,以接受 which 除了有关错误的所有信息(例如,堆栈跟踪等)包含原始消息(触发错误的消息)之外。
注意:自定义错误处理程序与框架提供的错误处理程序(即日志记录和 Binder 错误处理程序 - 请参阅上一节)互斥,以确保它们不会干扰。Consumer
ErrorMessage
@Bean
public Consumer<ErrorMessage> myErrorHandler() {
return v -> {
// send SMS notification code
};
}
要将此类使用者标识为错误处理程序,您只需提供指向函数名称 - 的属性。error-handler-definition
spring.cloud.stream.bindings.<binding-name>.error-handler-definition=myErrorHandler
例如,对于绑定名称,该属性将如下所示:uppercase-in-0
spring.cloud.stream.bindings.uppercase-in-0.error-handler-definition=myErrorHandler
如果你使用特殊的 map 指令将 binding 映射到更易读的名称 - ,那么这个属性将如下所示:spring.cloud.stream.function.bindings.uppercase-in-0=upper
spring.cloud.stream.bindings.upper.error-handler-definition=myErrorHandler.
如果您不小心将此类处理程序声明为 a ,它仍然可以工作,但不会对其输出执行任何操作。但是,鉴于此类处理程序仍然依赖于 Spring Cloud Function 提供的功能,因此,如果您希望通过函数组合解决某些复杂性(但不太可能),您也可以从函数组合中受益。Function |
默认错误处理程序
如果你想为所有函数 bean 使用单个错误处理程序,则可以使用标准的 spring-cloud-stream 机制来定义默认属性spring.cloud.stream.default.error-handler-definition=myErrorHandler
DLQ - 死信队列
也许是最常见的机制,DLQ 允许将失败的消息发送到一个特殊的目的地:死信队列。
配置后,失败的消息将发送到此目标,以便后续重新处理或审计和对帐。
请考虑以下示例:
@SpringBootApplication
public class SimpleStreamApplication {
public static void main(String[] args) throws Exception {
SpringApplication.run(SimpleStreamApplication.class,
"--spring.cloud.function.definition=uppercase",
"--spring.cloud.stream.bindings.uppercase-in-0.destination=uppercase",
"--spring.cloud.stream.bindings.uppercase-in-0.group=myGroup",
"--spring.cloud.stream.rabbit.bindings.uppercase-in-0.consumer.auto-bind-dlq=true"
);
}
@Bean
public Function<Person, Person> uppercase() {
return personIn -> {
throw new RuntimeException("intentional");
});
};
}
}
提醒一下,在此示例中,属性的 segment 对应于输入目标绑定的名称。
该段指示它是使用者属性。uppercase-in-0
consumer
使用 DLQ 时,至少必须提供该属性才能正确命名 DLQ 目标。但是,经常一起使用
with property,如我们的示例所示。group group destination |
除了一些标准属性之外,我们还设置了 to 指示 binder 创建和配置 DLQ destination 进行绑定,这与 destination 相对应(参见相应的属性),这将产生一个名为 (参见 Kafka 文档以了解 Kafka 特定的 DLQ 属性)。auto-bind-dlq
uppercase-in-0
uppercase
uppercase.myGroup.dlq
配置后,所有失败的消息都将路由到此目标,并保留原始消息以供进一步操作。
您可以看到错误消息包含与原始错误相关的更多信息,如下所示:
. . . .
x-exception-stacktrace: org.springframework.messaging.MessageHandlingException: nested exception is
org.springframework.messaging.MessagingException: has an error, failedMessage=GenericMessage [payload=byte[15],
headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=input.hello, amqp_deliveryTag=1,
deliveryAttempt=3, amqp_consumerQueue=input.hello, amqp_redelivered=false, id=a15231e6-3f80-677b-5ad7-d4b1e61e486e,
amqp_consumerTag=amq.ctag-skBFapilvtZhDsn0k3ZmQg, contentType=application/json, timestamp=1522327846136}]
at org.spring...integ...han...MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:107)
at. . . . .
Payload: blah
您还可以通过设置为 '1' 来促进立即调度到 DLQ (无需重试)。例如max-attempts
--spring.cloud.stream.bindings.uppercase-in-0.consumer.max-attempts=1
重试模板
在本节中,我们将介绍与重试功能配置相关的配置属性。
它是 Spring Retry 库的一部分。
虽然本文档无法涵盖 的所有功能,但我们
将提及以下与
这:RetryTemplate
RetryTemplate
RetryTemplate
- 最大尝试次数
-
尝试处理消息的次数。
默认值:3。
- backOffInitialInterval
-
重试时的回退初始间隔。
默认为 1000 毫秒。
- backOffMaxInterval
-
最大回退间隔。
默认为 10000 毫秒。
- backOffMultiplier 的
-
回退乘数。
默认 2.0。
- defaultRetryable
-
侦听器引发的未在 中列的异常是否可重试。
retryableExceptions
违约:。
true
- retryableExceptions
-
键中 Throwable 类名的映射,值中有一个布尔值。 指定将要或不会重试的异常 (和子类)。 另请参阅 。 例:。
defaultRetriable
spring.cloud.stream.bindings.input.consumer.retryable-exceptions.java.lang.IllegalStateException=false
默认值:空。
虽然上述设置足以满足大多数自定义要求,但它们可能无法满足某些复杂的要求,其中
点,您可能需要提供自己的 .为此,请在应用程序配置中将其配置为 Bean。该应用程序提供
实例将覆盖框架提供的实例。此外,为避免冲突,您必须限定 Binder 要使用的 实例
如。例如RetryTemplate
RetryTemplate
@StreamRetryTemplate
@StreamRetryTemplate
public RetryTemplate myRetryTemplate() {
return new RetryTemplate();
}
从上面的例子中可以看出,你不需要用 since is a qualified 来注释它。@Bean
@StreamRetryTemplate
@Bean
如果需要更精确地使用 Bean,则可以在 要关联的 bean 中按名称指定 bean
每个绑定的特定重试 Bean。RetryTemplate
ConsumerProperties
spring.cloud.stream.bindings.<foo>.consumer.retry-template-name=<your-retry-template-bean-name>