此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.4.3spring-doc.cadn.net.cn

提供的建议课程

除了提供应用 AOP 建议类的通用机制外, Spring 集成还提供了这些开箱即用的建议实现:spring-doc.cadn.net.cn

重试建议

重试建议 (o.s.i.handler.advice.RequestHandlerRetryAdvice)利用了 Spring Retry 项目提供的丰富重试机制。 核心组件spring-retryRetryTemplate,它允许配置复杂的重试场景,包括RetryPolicyBackoffPolicy策略(具有许多实现方式)以及RecoveryCallback策略来确定重试用尽时要采取的作。spring-doc.cadn.net.cn

无状态重试

无状态重试是指重试活动完全在通知中处理的情况。 线程将暂停(如果配置为这样做)并重试该作。spring-doc.cadn.net.cn

状态重试

有状态重试是指在通知中管理重试状态,但引发异常并且调用方重新提交请求的情况。 有状态重试的一个示例是,当我们希望消息发起方(例如 JMS)负责重新提交,而不是在当前线程上执行时。 有状态重试需要某种机制来检测重试的提交。spring-doc.cadn.net.cn

有关更多信息spring-retry,请参阅项目的 JavadocSpring Batch 的参考文档,其中spring-retry起源。spring-doc.cadn.net.cn

默认的 back off 行为是 not reoff 。 将立即尝试重试。 使用导致线程在两次尝试之间暂停的回退策略可能会导致性能问题,包括内存使用过多和线程不足。 在大容量环境中,应谨慎使用回退策略。

配置 Retry Advice

本节中的示例使用了以下内容<service-activator>这总是会引发一个异常:spring-doc.cadn.net.cn

public class FailingService {

    public void service(String message) {
        throw new RuntimeException("error");
    }
}
简单无状态重试

默认的RetryTemplate具有SimpleRetryPolicy它尝试了 3 次。 没有BackOffPolicy,因此这三次尝试是背靠背进行的,两次尝试之间没有延迟。 没有RecoveryCallback,因此结果是在最后一次重试失败后向调用方抛出异常。 在 Spring 集成环境中,这个最终异常可以通过使用error-channel在入站终端节点上。 以下示例使用RetryTemplate并显示其DEBUG输出:spring-doc.cadn.net.cn

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"/>
    </int:request-handler-advice-chain>
</int:service-activator>

DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
DEBUG [task-scheduler-2]Retry: count=0
DEBUG [task-scheduler-2]Checking for rethrow: count=1
DEBUG [task-scheduler-2]Retry: count=1
DEBUG [task-scheduler-2]Checking for rethrow: count=2
DEBUG [task-scheduler-2]Retry: count=2
DEBUG [task-scheduler-2]Checking for rethrow: count=3
DEBUG [task-scheduler-2]Retry failed last attempt: count=3
带恢复的简单无状态重试

以下示例添加了一个RecoveryCallback添加到前面的示例中,并使用ErrorMessageSendingRecoverer要发送ErrorMessage添加到频道中:spring-doc.cadn.net.cn

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice">
            <property name="recoveryCallback">
                <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer">
                    <constructor-arg ref="myErrorChannel" />
                </bean>
            </property>
        </bean>
    </int:request-handler-advice-chain>
</int:service-activator>

DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
DEBUG [task-scheduler-2]Retry: count=0
DEBUG [task-scheduler-2]Checking for rethrow: count=1
DEBUG [task-scheduler-2]Retry: count=1
DEBUG [task-scheduler-2]Checking for rethrow: count=2
DEBUG [task-scheduler-2]Retry: count=2
DEBUG [task-scheduler-2]Checking for rethrow: count=3
DEBUG [task-scheduler-2]Retry failed last attempt: count=3
DEBUG [task-scheduler-2]Sending ErrorMessage :failedMessage:[Payload=...]
使用自定义策略的无状态重试和恢复

对于更复杂的问题,我们可以提供定制的建议RetryTemplate. 此示例继续使用SimpleRetryPolicy但将尝试次数增加到 4 次。 它还添加了一个ExponentialBackoffPolicy其中,第一次重试等待 1 秒,第二次重试等待 5 秒,第三次重试等待 25 秒(总共尝试 4 次)。 下面的清单显示了该示例及其DEBUG输出:spring-doc.cadn.net.cn

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice">
            <property name="recoveryCallback">
                <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer">
                    <constructor-arg ref="myErrorChannel" />
                </bean>
            </property>
            <property name="retryTemplate" ref="retryTemplate" />
        </bean>
    </int:request-handler-advice-chain>
</int:service-activator>

<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
    <property name="retryPolicy">
        <bean class="org.springframework.retry.policy.SimpleRetryPolicy">
            <property name="maxAttempts" value="4" />
        </bean>
    </property>
    <property name="backOffPolicy">
        <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
            <property name="initialInterval" value="1000" />
            <property name="multiplier" value="5.0" />
            <property name="maxInterval" value="60000" />
        </bean>
    </property>
</bean>

27.058 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...]
27.071 DEBUG [task-scheduler-1]Retry: count=0
27.080 DEBUG [task-scheduler-1]Sleeping for 1000
28.081 DEBUG [task-scheduler-1]Checking for rethrow: count=1
28.081 DEBUG [task-scheduler-1]Retry: count=1
28.081 DEBUG [task-scheduler-1]Sleeping for 5000
33.082 DEBUG [task-scheduler-1]Checking for rethrow: count=2
33.082 DEBUG [task-scheduler-1]Retry: count=2
33.083 DEBUG [task-scheduler-1]Sleeping for 25000
58.083 DEBUG [task-scheduler-1]Checking for rethrow: count=3
58.083 DEBUG [task-scheduler-1]Retry: count=3
58.084 DEBUG [task-scheduler-1]Checking for rethrow: count=4
58.084 DEBUG [task-scheduler-1]Retry failed last attempt: count=4
58.086 DEBUG [task-scheduler-1]Sending ErrorMessage :failedMessage:[Payload=...]
命名空间对无状态重试的支持

从版本 4.0 开始,由于命名空间支持重试建议,可以大大简化前面的配置,如下例所示:spring-doc.cadn.net.cn

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <ref bean="retrier" />
    </int:request-handler-advice-chain>
</int:service-activator>

<int:handler-retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel">
    <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" />
</int:handler-retry-advice>

在前面的示例中,通知被定义为顶级 bean,以便它可以在多个request-handler-advice-chain实例。 您还可以直接在链中定义建议,如下例所示:spring-doc.cadn.net.cn

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <int:retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel">
            <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" />
        </int:retry-advice>
    </int:request-handler-advice-chain>
</int:service-activator>

一个<handler-retry-advice>可以具有<fixed-back-off><exponential-back-off>child 元素或没有 child 元素。 一个<handler-retry-advice>没有子元素时,不使用退避。 如果没有recovery-channel,则在重试次数用尽时引发异常。 命名空间只能与无状态重试一起使用。spring-doc.cadn.net.cn

对于更复杂的环境(自定义策略等),请使用 normal<bean>定义。spring-doc.cadn.net.cn

带恢复的简单状态重试

要使重试有状态,我们需要为 retry 提供RetryStateGenerator实现。 此类用于将邮件标识为重新提交,以便RetryTemplate可以确定此消息的当前重试状态。 该框架提供了一个SpelExpressionRetryStateGenerator,它通过使用 SPEL 表达式确定消息标识符。 此示例再次使用默认策略 (3 次尝试,无回退)。 与无状态重试一样,这些策略可以自定义。 下面的清单显示了该示例及其DEBUG输出:spring-doc.cadn.net.cn

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice">
            <property name="retryStateGenerator">
                <bean class="o.s.i.handler.advice.SpelExpressionRetryStateGenerator">
                    <constructor-arg value="headers['jms_messageId']" />
                </bean>
            </property>
            <property name="recoveryCallback">
                <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer">
                    <constructor-arg ref="myErrorChannel" />
                </bean>
            </property>
        </bean>
    </int:request-handler-advice-chain>
</int:service-activator>

24.351 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...]
24.368 DEBUG [Container#0-1]Retry: count=0
24.387 DEBUG [Container#0-1]Checking for rethrow: count=1
24.387 DEBUG [Container#0-1]Rethrow in retry for policy: count=1
24.387 WARN  [Container#0-1]failure occurred in gateway sendAndReceive
org.springframework.integration.MessagingException: Failed to invoke handler
...
Caused by: java.lang.RuntimeException: foo
...
24.391 DEBUG [Container#0-1]Initiating transaction rollback on application exception
...
25.412 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...]
25.412 DEBUG [Container#0-1]Retry: count=1
25.413 DEBUG [Container#0-1]Checking for rethrow: count=2
25.413 DEBUG [Container#0-1]Rethrow in retry for policy: count=2
25.413 WARN  [Container#0-1]failure occurred in gateway sendAndReceive
org.springframework.integration.MessagingException: Failed to invoke handler
...
Caused by: java.lang.RuntimeException: foo
...
25.414 DEBUG [Container#0-1]Initiating transaction rollback on application exception
...
26.418 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...]
26.418 DEBUG [Container#0-1]Retry: count=2
26.419 DEBUG [Container#0-1]Checking for rethrow: count=3
26.419 DEBUG [Container#0-1]Rethrow in retry for policy: count=3
26.419 WARN  [Container#0-1]failure occurred in gateway sendAndReceive
org.springframework.integration.MessagingException: Failed to invoke handler
...
Caused by: java.lang.RuntimeException: foo
...
26.420 DEBUG [Container#0-1]Initiating transaction rollback on application exception
...
27.425 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...]
27.426 DEBUG [Container#0-1]Retry failed last attempt: count=3
27.426 DEBUG [Container#0-1]Sending ErrorMessage :failedMessage:[Payload=...]

如果将前面的示例与无状态示例进行比较,可以看到,使用有状态重试时,每次失败时都会向调用方引发异常。spring-doc.cadn.net.cn

重试的异常分类

Spring Retry 在确定哪些异常可以调用重试方面具有很大的灵活性。 默认配置对所有异常重试,异常分类器查看顶级异常。 如果您将其配置为仅在MyException,您的应用程序会抛出一个SomeOtherException其中,原因是MyException,则不会发生重试。spring-doc.cadn.net.cn

从 Spring Retry 1.0.3 开始,BinaryExceptionClassifier具有一个名为traverseCauses(默认值为false). 什么时候true,它会遍历异常原因,直到找到匹配项或用完遍历的原因。spring-doc.cadn.net.cn

要使用此分类器进行重试,请使用SimpleRetryPolicy使用采用最大尝试次数的构造函数创建,MapException对象和traverseCauses布尔。 然后,您可以将此策略注入到RetryTemplate.spring-doc.cadn.net.cn

traverseCauses是必需的,因为用户异常可能包装在MessagingException.

熔断器建议

断路器模式的一般思路是,如果某个服务当前不可用,请不要浪费时间(和资源)来尝试使用它。 这o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice实现此模式。 当断路器处于 closed 状态时,终端节点会尝试调用该服务。 如果连续尝试一定次数失败,则断路器将进入 open 状态。 当它处于打开状态时,新请求“快速失败”,并且在一段时间到期之前不会尝试调用该服务。spring-doc.cadn.net.cn

当该时间到期时,断路器将设置为半开状态。 在此状态下,即使一次尝试失败,断路器也会立即进入打开状态。 如果尝试成功,断路器将进入 closed 状态,在这种情况下,它不会再次进入 open 状态,直到再次发生配置的连续失败次数。 任何成功的尝试都会将状态重置为零失败,以确定断路器何时可能再次进入打开状态。spring-doc.cadn.net.cn

通常,此建议可能用于外部服务,在这些服务中,可能需要一些时间才能失败(例如尝试建立网络连接时超时)。spring-doc.cadn.net.cn

RequestHandlerCircuitBreakerAdvice有两个属性:thresholdhalfOpenAfter. 这threshold属性表示 breaker 打开之前需要发生的连续失败次数。 它默认为5. 这halfOpenAfter属性表示 Breaker 在最后一次失败后等待其他请求的时间。 默认值为 1000 毫秒。spring-doc.cadn.net.cn

以下示例配置断路器并显示其DEBUGERROR输出:spring-doc.cadn.net.cn

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <bean class="o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice">
            <property name="threshold" value="2" />
            <property name="halfOpenAfter" value="12000" />
        </bean>
    </int:request-handler-advice-chain>
</int:service-activator>

05.617 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...]
05.638 ERROR [task-scheduler-1]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
10.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
10.600 ERROR [task-scheduler-2]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
15.598 DEBUG [task-scheduler-3]preSend on channel 'input', message: [Payload=...]
15.599 ERROR [task-scheduler-3]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
20.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
20.598 ERROR [task-scheduler-2]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
25.598 DEBUG [task-scheduler-5]preSend on channel 'input', message: [Payload=...]
25.601 ERROR [task-scheduler-5]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
30.598 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=foo...]
30.599 ERROR [task-scheduler-1]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator

在前面的示例中,阈值设置为2halfOpenAfter设置为12秒。 每 5 秒收到一个新请求。 前两次尝试调用了该服务。 第三个和第四个失败,并出现异常,指示断路器已打开。 尝试了第五个请求,因为该请求是在上次失败后的 15 秒。 第六次尝试立即失败,因为断路器立即打开。spring-doc.cadn.net.cn

表达式计算建议

最后提供的通知类是o.s.i.handler.advice.ExpressionEvaluatingRequestHandlerAdvice. 这个建议比其他两个建议更普遍。 它提供了一种机制,用于计算发送到终端节点的原始入站消息的表达式。 在成功或失败后,可以评估单独的表达式。 (可选)可以将包含评估结果的消息与输入消息一起发送到消息通道。spring-doc.cadn.net.cn

此建议的一个典型用例可能是使用<ftp:outbound-channel-adapter/>,如果传输成功,则可能将文件移动到一个目录,如果传输失败,则移动到另一个目录:spring-doc.cadn.net.cn

该通知具有用于设置成功时表达式、失败时设置表达式以及每个选项的相应通道的属性。 对于成功案例,发送到successChannel是一个AdviceMessage,其中 payload 是表达式 evaluation 的结果。 一个名为inputMessage,包含发送到处理程序的原始消息。 发送到failureChannel(当处理程序引发异常时)是一个ErrorMessage负载为MessageHandlingExpressionEvaluatingAdviceException. 赞所有人MessagingException实例,则此有效负载具有failedMessagecause属性,以及一个名为evaluationResult,其中包含表达式 evaluation 的结果。spring-doc.cadn.net.cn

从版本 5.1.3 开始,如果配置了 channels,但未提供表达式,则默认表达式用于计算为payload的消息。

当在通知范围内引发异常时,默认情况下,该异常会在任何failureExpression被评估。 如果您希望禁止抛出异常,请将trapExceptionproperty 设置为true. 以下建议显示了如何配置advice使用 Java DSL:spring-doc.cadn.net.cn

@SpringBootApplication
public class EerhaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(EerhaApplication.class, args);
        MessageChannel in = context.getBean("advised.input", MessageChannel.class);
        in.send(new GenericMessage<>("good"));
        in.send(new GenericMessage<>("bad"));
        context.close();
    }

    @Bean
    public IntegrationFlow advised() {
        return f -> f.<String>handle((payload, headers) -> {
            if (payload.equals("good")) {
                return null;
            }
            else {
                throw new RuntimeException("some failure");
            }
        }, c -> c.advice(expressionAdvice()));
    }

    @Bean
    public Advice expressionAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setSuccessChannelName("success.input");
        advice.setOnSuccessExpressionString("payload + ' was successful'");
        advice.setFailureChannelName("failure.input");
        advice.setOnFailureExpressionString(
                "payload + ' was bad, with reason: ' + #exception.cause.message");
        advice.setTrapException(true);
        return advice;
    }

    @Bean
    public IntegrationFlow success() {
        return f -> f.handle(System.out::println);
    }

    @Bean
    public IntegrationFlow failure() {
        return f -> f.handle(System.out::println);
    }

}

Rate Limiter 建议

Rate Limiter 建议 (RateLimiterRequestHandlerAdvice) 允许确保终端节点不会因请求而过载。 当超出速率限制时,请求将进入 blocked 状态。spring-doc.cadn.net.cn

此建议的一个典型用例可能是外部服务提供商不允许超过n每分钟的请求数。spring-doc.cadn.net.cn

RateLimiterRequestHandlerAdvice实施完全基于 Resilience4j 项目,并且需要RateLimiterRateLimiterConfig注射。 也可以使用默认值和/或自定义名称进行配置。spring-doc.cadn.net.cn

以下示例将速率限制器建议配置为每 1 秒 1 个请求:spring-doc.cadn.net.cn

@Bean
public RateLimiterRequestHandlerAdvice rateLimiterRequestHandlerAdvice() {
    return new RateLimiterRequestHandlerAdvice(RateLimiterConfig.custom()
            .limitRefreshPeriod(Duration.ofSeconds(1))
            .limitForPeriod(1)
            .build());
}

@ServiceActivator(inputChannel = "requestChannel", outputChannel = "resultChannel",
		adviceChain = "rateLimiterRequestHandlerAdvice")
public String handleRequest(String payload) {
    ...
}

缓存建议

从版本 5.2 开始,CacheRequestHandlerAdvice已引入。 它基于 Spring Framework 中的缓存抽象,并与@Caching注释族。 内部的 logic 基于CacheAspectSupport扩展,其中缓存作的代理是围绕AbstractReplyProducingMessageHandler.RequestHandler.handleRequestMessage方法与请求Message<?>作为参数。 可以使用 SPEL 表达式或Function以评估缓存键。 请求Message<?>可用作 SPEL 评估上下文的根对象,或作为Functioninput 参数。 默认情况下,payload的请求消息用于缓存键。 这CacheRequestHandlerAdvice必须配置cacheNames,当默认缓存作为CacheableOperation,或者使用任意CacheOperations. 每CacheOperation可以单独配置或具有共享选项,例如CacheManager,CacheResolverCacheErrorHandler,可以从CacheRequestHandlerAdvice配置。 此配置功能类似于 Spring Framework 的@CacheConfig@Caching注释组合。 如果CacheManager,则默认情况下会从BeanFactoryCacheAspectSupport.spring-doc.cadn.net.cn

以下示例使用一组不同的缓存作配置两个建议:spring-doc.cadn.net.cn

@Bean
public CacheRequestHandlerAdvice cacheAdvice() {
    CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice(TEST_CACHE);
    cacheRequestHandlerAdvice.setKeyExpressionString("payload");
    return cacheRequestHandlerAdvice;
}

@Transformer(inputChannel = "transformerChannel", outputChannel = "nullChannel", adviceChain = "cacheAdvice")
public Object transform(Message<?> message) {
    ...
}

@Bean
public CacheRequestHandlerAdvice cachePutAndEvictAdvice() {
    CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice();
    cacheRequestHandlerAdvice.setKeyExpressionString("payload");
    CachePutOperation.Builder cachePutBuilder = new CachePutOperation.Builder();
    cachePutBuilder.setCacheName(TEST_PUT_CACHE);
    CacheEvictOperation.Builder cacheEvictBuilder = new CacheEvictOperation.Builder();
    cacheEvictBuilder.setCacheName(TEST_CACHE);
    cacheRequestHandlerAdvice.setCacheOperations(cachePutBuilder.build(), cacheEvictBuilder.build());
    return cacheRequestHandlerAdvice;
}

@ServiceActivator(inputChannel = "serviceChannel", outputChannel = "nullChannel",
    adviceChain = "cachePutAndEvictAdvice")
public Message<?> service(Message<?> message) {
    ...
}