对于最新的稳定版本,请使用 Spring Integration 6.3.4! |
对于最新的稳定版本,请使用 Spring Integration 6.3.4! |
除了提供应用 AOP 建议类的通用机制外, Spring 集成还提供了这些开箱即用的建议实现:
-
RequestHandlerRetryAdvice
(如重试建议中所述) -
RequestHandlerCircuitBreakerAdvice
(在 Circuit Breaker Advice 中描述) -
ExpressionEvaluatingRequestHandlerAdvice
(在 Expression Advice 中描述) -
RateLimiterRequestHandlerAdvice
(如 Rate Limiter Advice 中所述) -
CacheRequestHandlerAdvice
(在 缓存建议 中描述) -
ReactiveRequestHandlerAdvice
(在 Reactive Advice 中描述) -
ContextHolderRequestHandlerAdvice
(在 Context Holder Advice 中描述)
重试建议
重试建议 () 利用了 Spring Retry 项目提供的丰富重试机制。
的核心组件是 ,它允许配置复杂的重试场景,包括 和 策略(具有许多实施)以及用于确定重试用尽时要采取的操作的策略。o.s.i.handler.advice.RequestHandlerRetryAdvice
spring-retry
RetryTemplate
RetryPolicy
BackoffPolicy
RecoveryCallback
- 无状态重试
-
无状态重试是指重试活动完全在通知中处理的情况。 线程将暂停(如果配置为这样做)并重试该操作。
- 状态重试
-
有状态重试是指在通知中管理重试状态,但引发异常并且调用方重新提交请求的情况。 有状态重试的一个示例是,当我们希望消息发起方(例如 JMS)负责重新提交,而不是在当前线程上执行时。 有状态重试需要某种机制来检测重试的提交。
有关 的更多信息,请参阅项目的 Javadoc 和 Spring Batch 的参考文档(其中的来源)。spring-retry
spring-retry
默认的 back off 行为是 not reoff 。 将立即尝试重试。 使用导致线程在两次尝试之间暂停的回退策略可能会导致性能问题,包括内存使用过多和线程不足。 在大容量环境中,应谨慎使用回退策略。 |
配置 Retry Advice
本节中的示例使用始终引发异常的以下内容:<service-activator>
public class FailingService {
public void service(String message) {
throw new RuntimeException("error");
}
}
- 简单无状态重试
-
默认值为 a that tries 3 次。 没有 ,因此这三次尝试是背靠背进行的,两次尝试之间没有延迟。 没有 ,因此结果是在最后一次重试失败后向调用方抛出异常。 在 Spring 集成环境中,这个最终异常可以通过在入站端点上使用来处理。 以下示例使用并显示其输出:
RetryTemplate
SimpleRetryPolicy
BackOffPolicy
RecoveryCallback
error-channel
RetryTemplate
DEBUG
<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"/> </int:request-handler-advice-chain> </int:service-activator> DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...] DEBUG [task-scheduler-2]Retry: count=0 DEBUG [task-scheduler-2]Checking for rethrow: count=1 DEBUG [task-scheduler-2]Retry: count=1 DEBUG [task-scheduler-2]Checking for rethrow: count=2 DEBUG [task-scheduler-2]Retry: count=2 DEBUG [task-scheduler-2]Checking for rethrow: count=3 DEBUG [task-scheduler-2]Retry failed last attempt: count=3
- 带恢复的简单无状态重试
-
以下示例将 a 添加到前面的示例中,并使用 an 将 发送到通道:
RecoveryCallback
ErrorMessageSendingRecoverer
ErrorMessage
<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"> <property name="recoveryCallback"> <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer"> <constructor-arg ref="myErrorChannel" /> </bean> </property> </bean> </int:request-handler-advice-chain> </int:service-activator> DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...] DEBUG [task-scheduler-2]Retry: count=0 DEBUG [task-scheduler-2]Checking for rethrow: count=1 DEBUG [task-scheduler-2]Retry: count=1 DEBUG [task-scheduler-2]Checking for rethrow: count=2 DEBUG [task-scheduler-2]Retry: count=2 DEBUG [task-scheduler-2]Checking for rethrow: count=3 DEBUG [task-scheduler-2]Retry failed last attempt: count=3 DEBUG [task-scheduler-2]Sending ErrorMessage :failedMessage:[Payload=...]
- 使用自定义策略的无状态重试和恢复
-
对于更复杂的问题,我们可以提供定制的建议。 此示例继续使用 ,但将尝试次数增加到 4 次。 它还添加了一个 first,第一次重试等待 1 秒,第二次重试等待 5 秒,第三次重试等待 25 秒(总共尝试 4 次)。 下面的清单显示了该示例及其输出:
RetryTemplate
SimpleRetryPolicy
ExponentialBackoffPolicy
DEBUG
<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"> <property name="recoveryCallback"> <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer"> <constructor-arg ref="myErrorChannel" /> </bean> </property> <property name="retryTemplate" ref="retryTemplate" /> </bean> </int:request-handler-advice-chain> </int:service-activator> <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate"> <property name="retryPolicy"> <bean class="org.springframework.retry.policy.SimpleRetryPolicy"> <property name="maxAttempts" value="4" /> </bean> </property> <property name="backOffPolicy"> <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy"> <property name="initialInterval" value="1000" /> <property name="multiplier" value="5.0" /> <property name="maxInterval" value="60000" /> </bean> </property> </bean> 27.058 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...] 27.071 DEBUG [task-scheduler-1]Retry: count=0 27.080 DEBUG [task-scheduler-1]Sleeping for 1000 28.081 DEBUG [task-scheduler-1]Checking for rethrow: count=1 28.081 DEBUG [task-scheduler-1]Retry: count=1 28.081 DEBUG [task-scheduler-1]Sleeping for 5000 33.082 DEBUG [task-scheduler-1]Checking for rethrow: count=2 33.082 DEBUG [task-scheduler-1]Retry: count=2 33.083 DEBUG [task-scheduler-1]Sleeping for 25000 58.083 DEBUG [task-scheduler-1]Checking for rethrow: count=3 58.083 DEBUG [task-scheduler-1]Retry: count=3 58.084 DEBUG [task-scheduler-1]Checking for rethrow: count=4 58.084 DEBUG [task-scheduler-1]Retry failed last attempt: count=4 58.086 DEBUG [task-scheduler-1]Sending ErrorMessage :failedMessage:[Payload=...]
- Namespace 对无状态重试的支持
-
从版本 4.0 开始,由于命名空间支持重试建议,可以大大简化前面的配置,如下例所示:
<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <ref bean="retrier" /> </int:request-handler-advice-chain> </int:service-activator> <int:handler-retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel"> <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" /> </int:handler-retry-advice>
在前面的示例中,建议被定义为顶级 Bean,以便它可以在多个实例中使用。 您还可以直接在链中定义建议,如下例所示:
request-handler-advice-chain
<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <int:retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel"> <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" /> </int:retry-advice> </int:request-handler-advice-chain> </int:service-activator>
A 可以具有 或 子元素,也可以没有子元素。 没有子元素的 A 不使用退避。 如果没有 ,则在重试用尽时引发异常。 命名空间只能与无状态重试一起使用。
<handler-retry-advice>
<fixed-back-off>
<exponential-back-off>
<handler-retry-advice>
recovery-channel
对于更复杂的环境(自定义策略等),请使用普通定义。
<bean>
- 带恢复的简单状态重试
-
要使 retry 有状态,我们需要通过 implementation 提供建议。 此类用于将邮件标识为重新提交,以便可以确定此邮件的当前重试状态。 框架提供了一个 ,它使用 SPEL 表达式确定消息标识符。 此示例再次使用默认策略 (3 次尝试,无回退)。 与无状态重试一样,这些策略可以自定义。 下面的清单显示了该示例及其输出:
RetryStateGenerator
RetryTemplate
SpelExpressionRetryStateGenerator
DEBUG
<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"> <property name="retryStateGenerator"> <bean class="o.s.i.handler.advice.SpelExpressionRetryStateGenerator"> <constructor-arg value="headers['jms_messageId']" /> </bean> </property> <property name="recoveryCallback"> <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer"> <constructor-arg ref="myErrorChannel" /> </bean> </property> </bean> </int:request-handler-advice-chain> </int:service-activator> 24.351 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 24.368 DEBUG [Container#0-1]Retry: count=0 24.387 DEBUG [Container#0-1]Checking for rethrow: count=1 24.387 DEBUG [Container#0-1]Rethrow in retry for policy: count=1 24.387 WARN [Container#0-1]failure occurred in gateway sendAndReceive org.springframework.integration.MessagingException: Failed to invoke handler ... Caused by: java.lang.RuntimeException: foo ... 24.391 DEBUG [Container#0-1]Initiating transaction rollback on application exception ... 25.412 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 25.412 DEBUG [Container#0-1]Retry: count=1 25.413 DEBUG [Container#0-1]Checking for rethrow: count=2 25.413 DEBUG [Container#0-1]Rethrow in retry for policy: count=2 25.413 WARN [Container#0-1]failure occurred in gateway sendAndReceive org.springframework.integration.MessagingException: Failed to invoke handler ... Caused by: java.lang.RuntimeException: foo ... 25.414 DEBUG [Container#0-1]Initiating transaction rollback on application exception ... 26.418 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 26.418 DEBUG [Container#0-1]Retry: count=2 26.419 DEBUG [Container#0-1]Checking for rethrow: count=3 26.419 DEBUG [Container#0-1]Rethrow in retry for policy: count=3 26.419 WARN [Container#0-1]failure occurred in gateway sendAndReceive org.springframework.integration.MessagingException: Failed to invoke handler ... Caused by: java.lang.RuntimeException: foo ... 26.420 DEBUG [Container#0-1]Initiating transaction rollback on application exception ... 27.425 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 27.426 DEBUG [Container#0-1]Retry failed last attempt: count=3 27.426 DEBUG [Container#0-1]Sending ErrorMessage :failedMessage:[Payload=...]
如果将前面的示例与无状态示例进行比较,可以看到,使用有状态重试时,每次失败时都会向调用方引发异常。
- 重试的异常分类
-
Spring Retry 在确定哪些异常可以调用重试方面具有很大的灵活性。 默认配置对所有异常重试,异常分类器查看顶级异常。 如果您将其配置为仅在 on 重试,并且您的应用程序引发 a ,其中原因是 ,则不会发生重试。
MyException
SomeOtherException
MyException
从 Spring Retry 1.0.3 开始,它有一个名为 (默认为 ) 的属性。 When 时,它会遍历异常原因,直到找到匹配项或用完遍历的原因。
BinaryExceptionClassifier
traverseCauses
false
true
要使用此分类器进行重试,请使用 created 和构造函数,该构造函数采用最大尝试次数、对象和布尔值。 然后,您可以将此策略注入到 .
SimpleRetryPolicy
Map
Exception
traverseCauses
RetryTemplate
traverseCauses 是必需的,因为用户异常可能包装在 .MessagingException |
默认的 back off 行为是 not reoff 。 将立即尝试重试。 使用导致线程在两次尝试之间暂停的回退策略可能会导致性能问题,包括内存使用过多和线程不足。 在大容量环境中,应谨慎使用回退策略。 |
traverseCauses 是必需的,因为用户异常可能包装在 .MessagingException |
熔断器建议
断路器模式的一般思路是,如果某个服务当前不可用,请不要浪费时间(和资源)来尝试使用它。
实现此模式。
当断路器处于 closed 状态时,终端节点会尝试调用该服务。
如果连续尝试一定次数失败,则断路器将进入 open 状态。
当它处于打开状态时,新请求“快速失败”,并且在一段时间到期之前不会尝试调用该服务。o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice
当该时间到期时,断路器将设置为半开状态。 在此状态下,即使一次尝试失败,断路器也会立即进入打开状态。 如果尝试成功,断路器将进入 closed 状态,在这种情况下,它不会再次进入 open 状态,直到再次发生配置的连续失败次数。 任何成功的尝试都会将状态重置为零失败,以确定断路器何时可能再次进入打开状态。
通常,此建议可能用于外部服务,在这些服务中,可能需要一些时间才能失败(例如尝试建立网络连接时超时)。
具有两个属性:和 。
该属性表示在断路器打开之前需要发生的连续失败次数。
它默认为 .
该属性表示 breaker 在上次失败后尝试另一个请求之前等待的时间。
默认值为 1000 毫秒。RequestHandlerCircuitBreakerAdvice
threshold
halfOpenAfter
threshold
5
halfOpenAfter
以下示例配置断路器并显示其 and 输出:DEBUG
ERROR
<int:service-activator input-channel="input" ref="failer" method="service">
<int:request-handler-advice-chain>
<bean class="o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice">
<property name="threshold" value="2" />
<property name="halfOpenAfter" value="12000" />
</bean>
</int:request-handler-advice-chain>
</int:service-activator>
05.617 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...]
05.638 ERROR [task-scheduler-1]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
10.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
10.600 ERROR [task-scheduler-2]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
15.598 DEBUG [task-scheduler-3]preSend on channel 'input', message: [Payload=...]
15.599 ERROR [task-scheduler-3]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
20.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
20.598 ERROR [task-scheduler-2]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
25.598 DEBUG [task-scheduler-5]preSend on channel 'input', message: [Payload=...]
25.601 ERROR [task-scheduler-5]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
30.598 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=foo...]
30.599 ERROR [task-scheduler-1]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
在前面的示例中,阈值设置为秒。
每 5 秒收到一个新请求。
前两次尝试调用了该服务。
第三个和第四个失败,并出现异常,指示断路器已打开。
尝试了第五个请求,因为该请求是在上次失败后的 15 秒。
第六次尝试立即失败,因为断路器立即打开。2
halfOpenAfter
12
表达式计算建议
最后提供的建议类是 .
这个建议比其他两个建议更普遍。
它提供了一种机制,用于计算发送到终端节点的原始入站消息的表达式。
在成功或失败后,可以评估单独的表达式。
(可选)可以将包含评估结果的消息与输入消息一起发送到消息通道。o.s.i.handler.advice.ExpressionEvaluatingRequestHandlerAdvice
此建议的一个典型用例可能是使用 ,如果传输成功,则将文件移动到一个目录,如果传输失败,则移动到另一个目录:<ftp:outbound-channel-adapter/>
该通知具有用于设置成功时表达式、失败时设置表达式以及每个选项的相应通道的属性。
对于成功案例,发送到的消息是 ,其中有效负载是表达式评估的结果。
名为 的附加属性包含发送到处理程序的原始消息。
发送到 的消息(当处理程序引发异常时)是负载为 的 。
与所有实例一样,此有效负载具有 和 properties,以及一个名为 的附加属性,其中包含表达式计算的结果。successChannel
AdviceMessage
inputMessage
failureChannel
ErrorMessage
MessageHandlingExpressionEvaluatingAdviceException
MessagingException
failedMessage
cause
evaluationResult
从版本 5.1.3 开始,如果配置了 channels,但未提供表达式,则默认表达式将用于评估消息的 the 。payload |
当在建议范围内引发异常时,默认情况下,该异常将在评估 any 后引发给调用方。
如果要禁止引发异常,请将该属性设置为 .
以下建议显示了如何使用 Java DSL 配置 一个:failureExpression
trapException
true
advice
@SpringBootApplication
public class EerhaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(EerhaApplication.class, args);
MessageChannel in = context.getBean("advised.input", MessageChannel.class);
in.send(new GenericMessage<>("good"));
in.send(new GenericMessage<>("bad"));
context.close();
}
@Bean
public IntegrationFlow advised() {
return f -> f.<String>handle((payload, headers) -> {
if (payload.equals("good")) {
return null;
}
else {
throw new RuntimeException("some failure");
}
}, c -> c.advice(expressionAdvice()));
}
@Bean
public Advice expressionAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setSuccessChannelName("success.input");
advice.setOnSuccessExpressionString("payload + ' was successful'");
advice.setFailureChannelName("failure.input");
advice.setOnFailureExpressionString(
"payload + ' was bad, with reason: ' + #exception.cause.message");
advice.setTrapException(true);
return advice;
}
@Bean
public IntegrationFlow success() {
return f -> f.handle(System.out::println);
}
@Bean
public IntegrationFlow failure() {
return f -> f.handle(System.out::println);
}
}
从版本 5.1.3 开始,如果配置了 channels,但未提供表达式,则默认表达式将用于评估消息的 the 。payload |
Rate Limiter 建议
Rate Limiter 建议 () 允许确保终端节点不会因请求而过载。
当超出速率限制时,请求将进入 blocked 状态。RateLimiterRequestHandlerAdvice
此建议的一个典型使用案例可能是外部服务提供商不允许每分钟超过请求的请求数。n
该实现完全基于 Resilience4j 项目,需要 OR 注射。
也可以使用默认值和/或自定义名称进行配置。RateLimiterRequestHandlerAdvice
RateLimiter
RateLimiterConfig
以下示例将速率限制器建议配置为每 1 秒 1 个请求:
@Bean
public RateLimiterRequestHandlerAdvice rateLimiterRequestHandlerAdvice() {
return new RateLimiterRequestHandlerAdvice(RateLimiterConfig.custom()
.limitRefreshPeriod(Duration.ofSeconds(1))
.limitForPeriod(1)
.build());
}
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "resultChannel",
adviceChain = "rateLimiterRequestHandlerAdvice")
public String handleRequest(String payload) {
...
}
缓存建议
从版本 5.2 开始,引入了 。
它基于 Spring Framework 中的缓存抽象,并与 Comments 系列提供的概念和功能保持一致。
内部的逻辑基于扩展,其中缓存操作的代理是围绕方法完成的,并将请求作为参数。
可以使用 SPEL 表达式或 a 配置此建议以评估缓存键。
该请求可用作 SPEL 评估上下文的根对象,或作为 input 参数。
默认情况下,请求消息的 用于缓存键。
当默认缓存操作为 时,必须使用 配置 ,或者使用任意 s 的集合进行配置。
每个选项都可以单独配置,也可以具有共享选项,如 、 和 ,可以从配置中重复使用。
此配置功能类似于 Spring Framework 和 Comments 的组合。
如果未提供 a,则默认情况下将从 中的 解析单个 bean。CacheRequestHandlerAdvice
@Caching
CacheAspectSupport
AbstractReplyProducingMessageHandler.RequestHandler.handleRequestMessage
Message<?>
Function
Message<?>
Function
payload
CacheRequestHandlerAdvice
cacheNames
CacheableOperation
CacheOperation
CacheOperation
CacheManager
CacheResolver
CacheErrorHandler
CacheRequestHandlerAdvice
@CacheConfig
@Caching
CacheManager
BeanFactory
CacheAspectSupport
以下示例使用一组不同的缓存操作配置两个建议:
@Bean
public CacheRequestHandlerAdvice cacheAdvice() {
CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice(TEST_CACHE);
cacheRequestHandlerAdvice.setKeyExpressionString("payload");
return cacheRequestHandlerAdvice;
}
@Transformer(inputChannel = "transformerChannel", outputChannel = "nullChannel", adviceChain = "cacheAdvice")
public Object transform(Message<?> message) {
...
}
@Bean
public CacheRequestHandlerAdvice cachePutAndEvictAdvice() {
CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice();
cacheRequestHandlerAdvice.setKeyExpressionString("payload");
CachePutOperation.Builder cachePutBuilder = new CachePutOperation.Builder();
cachePutBuilder.setCacheName(TEST_PUT_CACHE);
CacheEvictOperation.Builder cacheEvictBuilder = new CacheEvictOperation.Builder();
cacheEvictBuilder.setCacheName(TEST_CACHE);
cacheRequestHandlerAdvice.setCacheOperations(cachePutBuilder.build(), cacheEvictBuilder.build());
return cacheRequestHandlerAdvice;
}
@ServiceActivator(inputChannel = "serviceChannel", outputChannel = "nullChannel",
adviceChain = "cachePutAndEvictAdvice")
public Message<?> service(Message<?> message) {
...
}