消息传递端点
消息端点
消息端点
本章的第一部分涵盖了一些背景理论,并揭示了驱动 Spring Integration 各种消息组件的基础 API 的诸多细节。 如果您希望深入理解幕后运作机制,这些信息将非常有帮助。 不过,如果您希望快速上手并使用基于简化命名空间的配置来配置各种元素,不妨先跳转到 端点命名空间支持。
正如概述中所述,消息端点负责将各种消息组件连接到通道。 在接下来的几章中,我们将介绍多种消费消息的组件。 其中一些组件还能够发送回复消息。 发送消息相当直接。 如前文在消息通道中所展示的那样,您可以向消息通道发送消息。 然而,接收消息则稍微复杂一些。 主要原因在于存在两种类型的消费者:轮询消费者和事件驱动消费者。
两者之中,事件驱动的消费者要简单得多。
由于无需管理和调度单独的轮询线程,它们本质上是带有回调方法的监听器。
当连接到 Spring Integration 的可订阅消息通道时,这种简单的选项效果极佳。
然而,当连接到缓冲型、可轮询的消息通道时,必须有某个组件来调度和管理轮询线程。
Spring Integration 提供了两种不同的端点实现来适配这两种类型的消费者。
因此,消费者本身只需实现回调接口。
当需要轮询时,端点充当消费者实例的容器。
其好处类似于使用容器托管消息驱动 Bean,但由于这些消费者是在 ApplicationContext 内运行的由 Spring 管理的对象,它更类似于 Spring 自身的 MessageListener 容器。
消息处理器
Spring Integration 的 MessageHandler 接口由框架内的许多组件实现。
换句话说,这不属于公共 API 的一部分,您通常不会直接实现 MessageHandler。
尽管如此,它被消息消费者用于实际处理已消费的消息,因此了解此策略接口有助于理解消费者的整体角色。
该接口的定义如下:
public interface MessageHandler {
void handleMessage(Message<?> message);
}
尽管该接口非常简单,但它为后续章节中介绍的大多数组件(如路由器、转换器、拆分器、聚合器、服务激活器等)奠定了基础。 这些组件对它们处理的消息执行的功能各不相同,但实际接收消息的要求是相同的,在轮询和事件驱动行为之间的选择也是相同的。 Spring Integration 提供了两个端点实现来托管这些基于回调的处理程序,并允许将它们连接到消息通道。
事件驱动消费者
由于这是两者中更简单的一个,我们首先介绍事件驱动的消费者端点。
您可能还记得,SubscribableChannel接口提供了一个subscribe()方法,该方法接受一个MessageHandler参数(如SubscribableChannel所示)。
以下代码片段展示了subscribe方法的定义:
subscribableChannel.subscribe(messageHandler);
由于订阅到通道的处理器无需主动轮询该通道,因此这是一种事件驱动的消费者。Spring Integration 提供的实现接受 SubscribableChannel 和 MessageHandler,如下示例所示:
SubscribableChannel channel = context.getBean("subscribableChannel", SubscribableChannel.class);
EventDrivenConsumer consumer = new EventDrivenConsumer(channel, exampleHandler);
轮询消费者
Spring Integration 还提供了 PollingConsumer,其实例化方式相同,但通道必须实现 PollableChannel,如下示例所示:
PollableChannel channel = context.getBean("pollableChannel", PollableChannel.class);
PollingConsumer consumer = new PollingConsumer(channel, exampleHandler);
| 有关轮询消费者的更多信息,请参阅 Poller 和 Channel Adapter。 |
轮询消费者有许多其他配置选项。 例如,触发器是一个必需的属性。 以下示例展示了如何设置触发器:
PollingConsumer consumer = new PollingConsumer(channel, handler);
consumer.setTrigger(new PeriodicTrigger(Duration.ofSeconds(30)));
PeriodicTrigger 通常使用简单间隔(Duration)定义,但也支持 initialDelay 属性和布尔型 fixedRate 属性(默认为 false — 即无固定延迟)。
以下示例同时设置了这两个属性:
PeriodicTrigger trigger = new PeriodicTrigger(Duration.ofSeconds(1));
trigger.setInitialDelay(Duration.ofSeconds(5));
trigger.setFixedRate(true);
上述示例中三个设置的结果是一个触发器,它会等待五秒,然后每秒触发一次。
CronTrigger 需要有效的 cron 表达式。
请参阅 Javadoc 了解详情。
以下示例设置新的 CronTrigger:
CronTrigger trigger = new CronTrigger("*/10 * * * * MON-FRI");
上一个示例中定义的触发器的结果是每隔十秒触发一次,时间为周一至周五。
除了触发器外,您还可以指定另外两个与轮询相关的配置属性:maxMessagesPerPoll 和 receiveTimeout。
以下示例展示了如何设置这两个属性:
PollingConsumer consumer = new PollingConsumer(channel, handler);
consumer.setMaxMessagesPerPoll(10);
consumer.setReceiveTimeout(5000);
maxMessagesPerPoll属性指定在给定轮询操作中接收的消息的最大数量。
这意味着轮询器会继续调用receive()而不等待,直到返回null或达到最大值为止。
例如,如果轮询器具有十秒的间隔触发器和将maxMessagesPerPoll设置为25,并且它正在轮询一个在其队列中有100条消息的通道,则可以在40秒内检索所有100条消息。
它会获取25条,等待十秒,再获取接下来的25条,依此类推。
如果maxMessagesPerPoll配置为负值,则在单个轮询周期内会调用MessageSource.receive(),直到返回null。
从版本5.5开始,0值具有特殊含义——完全跳过MessageSource.receive()调用,这可以被视为暂停此轮询端点,直到maxMessagesPerPoll稍后被更改为非零值,例如通过控制总线(Control Bus)。
receiveTimeout 属性指定轮询器在调用接收操作时,如果没有可用消息应等待的时间量。
例如,考虑两个表面上看似相似但实际上截然不同的选项:第一个选项的间隔触发器为 5 秒,接收超时时间为 50 毫秒;而第二个选项的间隔触发器为 50 毫秒,接收超时时间为 5 秒。
第一个选项可能在消息到达通道后最多延迟 4950 毫秒才接收到该消息(如果该消息是在其某次轮询调用返回后立即到达的)。
另一方面,第二种配置永远不会让消息丢失超过 50 毫秒。
二者的区别在于,第二个选项需要一个线程进行等待。
然而,结果是它能对到达的消息做出更快的响应。
这种被称为“长轮询”的技术可用于在轮询源上模拟事件驱动行为。
轮询消费者也可以委托给 Spring TaskExecutor,如下示例所示:
PollingConsumer consumer = new PollingConsumer(channel, handler);
TaskExecutor taskExecutor = context.getBean("exampleExecutor", TaskExecutor.class);
consumer.setTaskExecutor(taskExecutor);
此外,PollingConsumer 具有一个名为 adviceChain 的属性。
该属性允许您指定 AOP 通知的 List,以处理包括事务在内的其他横切关注点。
这些通知应用于 doPoll() 方法周围。
如需更详细的信息,请参阅端点命名空间支持下的 AOP 通知链和事务支持部分。
前面的示例展示了依赖查找。
然而,请记住,这些消费者通常配置为 Spring Bean 定义。
实际上,Spring Integration 还提供了一个 FactoryBean,名为 ConsumerEndpointFactoryBean,它根据通道的类型创建相应的消费者类型。
此外,Spring Integration 完全支持 XML 命名空间,可以进一步隐藏这些细节。
本指南在介绍每种组件类型时,都会展示基于命名空间的配置。
许多 MessageHandler 实现可以生成回复消息。
正如前面所提到的,与接收消息相比,发送消息是微不足道的。
然而,何时以及发送多少回复消息取决于处理器类型。
例如,聚合器(aggregator)会等待一定数量的消息到达,并且通常被配置为拆分器(splitter)的下游消费者,而拆分器可以为它处理的每条消息生成多个回复。
在使用命名空间配置时,您并不严格需要了解所有细节。
不过,了解这些组件中的几个共享一个公共基类 AbstractReplyProducingMessageHandler,并且该基类提供了一个 setOutputChannel(..) 方法,可能仍然值得。 |
端点命名空间支持
在本参考手册中,您可以找到针对端点元素(如 router、transformer、service-activator 等)的特定配置示例。
大多数这些元素支持 input-channel 属性,许多还支持 output-channel 属性。
解析后,这些端点元素会根据所引用的 input-channel 的类型生成 PollingConsumer 或 EventDrivenConsumer 的实例:PollableChannel 或 SubscribableChannel,分别对应。
当通道可轮询时,轮询行为基于端点元素的 poller 子元素及其属性。
以下列表列出了 poller 的所有可用配置选项:
<int:poller cron="" (1)
default="false" (2)
error-channel="" (3)
fixed-delay="" (4)
fixed-rate="" (5)
id="" (6)
max-messages-per-poll="" (7)
receive-timeout="" (8)
ref="" (9)
task-executor="" (10)
time-unit="MILLISECONDS" (11)
trigger=""> (12)
<int:advice-chain /> (13)
<int:transactional /> (14)
</int:poller>
| 1 | 提供使用 Cron 表达式配置轮询器的功能。
底层实现使用一个 org.springframework.scheduling.support.CronTrigger。
如果设置了此属性,则不得指定以下任何属性:fixed-delay、trigger、fixed-rate 和 ref。 |
| 2 | 通过将此属性设置为 true,您可以定义一个全局默认轮询器。
如果在应用程序上下文中定义了多个默认轮询器,则会抛出异常。
任何连接到 PollableChannel(PollingConsumer)的端点,或任何未显式配置轮询器的 SourcePollingChannelAdapter,将使用全局默认轮询器。
其默认值为 false。
可选。 |
| 3 | 标识当此轮询器的调用发生故障时,错误消息发送到的通道。
要完全抑制异常,您可以提供对 nullChannel 的引用。
可选。 |
| 4 | 固定延迟触发器在底层使用 PeriodicTrigger。
如果您不使用 time-unit 属性,则指定值的单位为毫秒。
如果设置了此属性,则不得指定以下任何属性:fixed-rate、trigger、cron 和 ref。 |
| 5 | 固定速率触发器在底层使用 PeriodicTrigger。
如果您不使用 time-unit 属性,则指定值将以毫秒为单位表示。
如果设置了此属性,则不得指定以下任何属性:fixed-delay、trigger、cron 和 ref。 |
| 6 | 引用轮询器底层 Bean 定义的 ID,其类型为 org.springframework.integration.scheduling.PollerMetadata。
除非是默认轮询器(default="true"),否则顶层 id 元素必须包含 id 属性。 |
| 7 | 有关更多信息,请参阅配置入站通道适配器。
如果未指定,默认值取决于上下文。
如果您使用PollingConsumer,此属性默认为-1。
但是,如果您使用SourcePollingChannelAdapter,则max-messages-per-poll属性默认为1。
可选。 |
| 8 | 值在底层类 PollerMetadata 上设置。
如果未指定,则默认为 1000(毫秒)。
可选。 |
| 9 | 对另一个顶级轮询器的 Bean 引用。
ref 属性不得出现在顶级 poller 元素上。
但是,如果设置了此属性,则不得指定以下任何属性:fixed-rate、trigger、cron 和 fixed-delay。 |
| 10 | 提供引用自定义任务执行器的能力。 有关更多信息,请参阅 TaskExecutor 支持。 可选。 |
| 11 | 此属性指定底层 org.springframework.scheduling.support.PeriodicTrigger 上的 java.util.concurrent.TimeUnit 枚举值。
因此,此属性只能与 fixed-delay 或 fixed-rate 属性结合使用。
如果与 cron 或 trigger 引用属性结合使用,将导致失败。
PeriodicTrigger 支持的最小粒度为毫秒。
因此,可用的选项仅为毫秒和秒。
如果未提供此值,则任何 fixed-delay 或 fixed-rate 值将被解释为毫秒。
基本上,此枚举为基于秒的区间触发器值提供了便利。
对于每小时、每天和每月的设置,我们建议使用 cron 触发器。 |
| 12 | 引用任何实现了 org.springframework.scheduling.Trigger 接口的 Spring 配置的 Bean。
然而,如果设置了此属性,则不得指定以下任何属性:fixed-delay、fixed-rate、cron 和 ref。
可选。 |
| 13 | 允许指定额外的 AOP 通知以处理其他横切关注点。 有关更多信息,请参阅 事务支持。 可选。 |
| 14 | 轮询器可以设置为事务性的。 有关更多信息,请参阅 AOP 通知链。 可选。 |
示例
一个具有 1 秒间隔的简单基于间隔的轮询器可以配置如下:
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller fixed-rate="1000"/>
</int:transformer>
作为不使用 fixed-rate 属性的替代方案,您也可以使用 fixed-delay 属性。
对于基于 Cron 表达式的轮询器,请使用 cron 属性,如下示例所示:
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller cron="*/10 * * * * MON-FRI"/>
</int:transformer>
如果输入通道是 PollableChannel,则必须配置轮询器。
具体而言,如前所述,trigger 是 PollingConsumer 类的一个必需属性。
因此,如果您在轮询消费者端点的配置中省略了 poller 子元素,可能会抛出异常。
如果您尝试在与不可轮询的通道相连的元素上配置轮询器,也可能抛出该异常。
也可以创建顶级轮询器,在这种情况下,只需要一个 ref 属性,如下例所示:
<int:poller id="weekdayPoller" cron="*/10 * * * * MON-FRI"/>
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller ref="weekdayPoller"/>
</int:transformer>
ref 属性仅允许用于内部轮询器定义。
如果在顶层轮询器上定义此属性,将在应用程序上下文初始化期间抛出配置异常。 |
全局默认轮询器
为了进一步简化配置,您可以定义一个全局默认轮询器。
在 XML DSL 中,单个顶层轮询器组件可以将 default 属性设置为 true。
对于 Java 配置,在这种情况下必须声明一个名为 PollerMetadata.DEFAULT_POLLER 的 PollerMetadata Bean。
在这种情况下,任何在其输入通道上具有 PollableChannel、在同一 ApplicationContext 内定义且未显式配置 poller 的端点都将使用该默认值。
以下示例展示了这样一个轮询器及其使用的转换器:
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setMaxMessagesPerPoll(5);
pollerMetadata.setTrigger(new PeriodicTrigger(3000));
return pollerMetadata;
}
// No 'poller' attribute because there is a default global poller
@Bean
public IntegrationFlow transformFlow(MyTransformer transformer) {
return IntegrationFlow.from(MessageChannels.queue("pollable"))
.transform(transformer) // No 'poller' attribute because there is a default global poller
.channel("output")
.get();
}
@Bean(PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setMaxMessagesPerPoll(5);
pollerMetadata.setTrigger(new PeriodicTrigger(3000));
return pollerMetadata;
}
@Bean
public QueueChannel pollable() {
return new QueueChannel();
}
// No 'poller' attribute because there is a default global poller
@Transformer(inputChannel = "pollable", outputChannel = "output")
public Object transform(Object payload) {
...
}
@Bean(PollerMetadata.DEFAULT_POLLER)
fun defaultPoller() =
PollerMetadata()
.also {
it.maxMessagesPerPoll = 5
it.trigger = PeriodicTrigger(3000)
}
@Bean
fun convertFlow() =
integrationFlow(MessageChannels.queue("pollable")) {
transform(transformer) // No 'poller' attribute because there is a default global poller
channel("output")
}
<int:poller id="defaultPoller" default="true" max-messages-per-poll="5" fixed-delay="3000"/>
<!-- No <poller/> sub-element is necessary, because there is a default -->
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output"/>
事务支持
Spring Integration 还为轮询器(pollers)提供了事务支持,以便每个接收和转发操作都可以作为原子工作单元执行。
要为轮询器配置事务,请添加 <transactional/> 子元素。
以下示例展示了可用的属性:
<int:poller fixed-delay="1000">
<int:transactional transaction-manager="txManager"
propagation="REQUIRED"
isolation="REPEATABLE_READ"
timeout="10000"
read-only="false"/>
</int:poller>
有关更多信息,请参阅 轮询事务支持。
AOP 通知链
由于 Spring 事务支持依赖于使用 TransactionInterceptor(AOP 通知)的代理机制来处理由轮询器启动的消息流的事务行为,因此您有时需要提供额外的通知来处理与轮询器相关的其他横切行为。
为此,poller 定义了一个 advice-chain 元素,允许您在实现 MethodInterceptor 接口的类中添加更多通知。
以下示例展示了如何为 poller 定义 advice-chain:
<int:service-activator id="advicedSa" input-channel="goodInputWithAdvice" ref="testBean"
method="good" output-channel="output">
<int:poller max-messages-per-poll="1" fixed-rate="10000">
<int:advice-chain>
<ref bean="adviceA" />
<beans:bean class="org.something.SampleAdvice" />
<ref bean="txAdvice" />
</int:advice-chain>
</int:poller>
</int:service-activator>
有关如何实现 MethodInterceptor 接口的更多信息,请参阅 Spring Framework Reference Guide 中的 AOP 部分。
建议链也可以应用于没有事务配置的轮询器,从而增强由轮询器启动的消息流的行为。
使用建议链时,不能指定 <transactional/> 子元素。
相反,请声明一个 <tx:advice/> bean 并将其添加到 <advice-chain/> 中。
有关完整的配置详细信息,请参阅 轮询事务支持。 |
TaskExecutor 支持
轮询线程可由 Spring 的 TaskExecutor 抽象的任何实例执行。这为端点或一组端点启用并发。自 Spring 3 起。0,核心 Spring 框架拥有一个 task 命名空间,其 <executor/> 元素支持创建简单的线程池执行器。该元素接受用于常见并发设置的属性,例如 pool-size(池大小)和 queue-capacity(队列容量)。配置线程池执行器可以显著影响端点在负载下的性能表现。这些设置适用于每个端点,因为端点的性能是需要考虑的主要因素之一(另一个主要因素是端点所订阅通道的预期流量)。要为配置了 XML 命名空间支持的轮询端点启用并发功能,请在其 task-executor 元素上提供 <poller/> 引用,然后提供以下示例中显示的一个或多个属性:
<int:poller task-executor="pool" fixed-rate="1000"/>
<task:executor id="pool"
pool-size="5-25"
queue-capacity="20"
keep-alive="120"/>
如果您未提供任务执行器,消费者的处理器将在调用者的线程中被调用。
请注意,调用者通常是默认的 TaskScheduler(参见 配置任务调度器)。
您还应牢记,task-executor 属性可以通过指定 bean 名称来引用 Spring 的 TaskExecutor 接口的任何实现。
前面所示的 executor 元素是出于便利而提供的。
正如前面在轮询消费者的背景部分中提到的,您还可以通过以下方式配置轮询消费者以模拟事件驱动的行为。
通过设置较长的接收超时时间和较短的触发间隔,您可以确保即使在轮询消息源上也能对到达的消息做出非常及时的反应。
请注意,这仅适用于具有带超时的阻塞等待调用的源。
例如,文件轮询器不会阻塞。
每次receive()调用都会立即返回,其中可能包含新文件,也可能不包含。
因此,即使轮询器中包含一个很长的receive-timeout,该值在这种场景下也永远不会被使用。
另一方面,当使用 Spring Integration 自己的基于队列的通道时,超时值确实有机会发挥作用。
下面的示例展示了轮询消费者如何近乎即时地接收消息:
<int:service-activator input-channel="someQueueChannel"
output-channel="output">
<int:poller receive-timeout="30000" fixed-rate="10"/>
</int:service-activator>
采用这种方法不会带来太多开销,因为内部而言,它不过是一个定时等待线程,其 CPU 资源消耗远低于(例如)一个疯狂运行的无限 while 循环。
在运行时更改轮询频率
当使用 fixed-delay 或 fixed-rate 属性配置轮询器时,默认实现会使用 PeriodicTrigger 实例。
PeriodicTrigger 是 Spring 框架核心的一部分。
它仅接受间隔作为构造函数参数。
因此,无法在运行时更改它。
然而,您可以定义自己的 org.springframework.scheduling.Trigger 接口实现。
您甚至可以将 PeriodicTrigger 作为起点。
然后,您可以为间隔(周期)添加一个 setter,或者甚至在触发器本身中嵌入自己的节流逻辑。
period 属性用于每次调用 nextExecutionTime 时以安排下一次轮询。
要在轮询器中使用此自定义触发器,请在您的应用上下文中声明自定义触发器的 bean 定义,并通过使用引用自定义触发器 bean 实例的 trigger 属性将依赖项注入到您的轮询器配置中。
现在,您可以获取对触发器 bean 的引用,并在轮询之间更改轮询间隔。
例如,请参阅 Spring Integration 示例 项目。
该项目包含一个名为 dynamic-poller 的示例,它使用自定义触发器,并展示了在运行时更改轮询间隔的能力。
该示例提供了一个自定义触发器,它实现了 org.springframework.scheduling.Trigger 接口。
该示例的触发器基于 Spring 的 PeriodicTrigger 实现。
然而,自定义触发器的字段不是 final 的,并且属性具有显式的 getter 和 setter,使您可以在运行时动态更改轮询周期。
需要注意的是,由于 Trigger 方法的值为 nextExecutionTime(),对动态触发器的任何更改在下次轮询之前不会生效,这基于现有配置。
无法强制触发器在其当前配置的下次执行时间之前触发。 |
有效载荷类型转换
在整个参考手册中,您还可以看到各种端点的特定配置和实现示例,这些端点接受消息或任意 Object 作为输入参数。
在 Object 的情况下,此类参数被映射到消息负载、负载的一部分或标头(当使用 Spring 表达式语言时)。
然而,端点方法的输入参数类型有时与负载或其部分的类型不匹配。
在这种情况下,我们需要执行类型转换。
Spring Integration 提供了一种便捷的方式,在其名为 integrationConversionService 的转换服务 Bean 实例内注册类型转换器(通过使用 Spring ConversionService)。
一旦通过 Spring Integration 基础设施定义了第一个转换器,该 Bean 便会自动创建。
To register a converter, you can implement org.springframework.core.convert.converter.Converter, org.springframework.core.convert.converter.GenericConverter, or org.springframework.core.convert.converter.ConverterFactory。
Converter 接口的实现是最简单的,它用于将单一类型转换为另一种类型。
对于更复杂的场景,例如转换到类层次结构,您可以实现 GenericConverter 接口,并可能同时实现 ConditionalConverter 接口。
这些接口使您能够完全访问 from 和 to 类型描述符,从而实现复杂的转换。
例如,如果您有一个名为 Something 的抽象类作为转换目标(如参数类型、通道数据类型等),并且有两个具体实现类 Thing1 和 Thing,您希望根据输入类型选择其中一个进行转换,那么 GenericConverter 将是合适的选择。
有关更多信息,请参阅这些接口的 Javadoc:
实现转换器后,您可以使用便捷的命名空间支持将其注册,如下例所示:
<int:converter ref="sampleConverter"/>
<bean id="sampleConverter" class="foo.bar.TestConverter"/>
或者,您可以使用内部 Bean,如下示例所示:
<int:converter>
<bean class="o.s.i.config.xml.ConverterParserTests$TestConverter3"/>
</int:converter>
从 Spring Integration 4.0 开始,您可以使用注解来创建前述配置,如下示例所示:
@Component
@IntegrationConverter
public class TestConverter implements Converter<Boolean, Number> {
public Number convert(Boolean source) {
return source ? 1 : 0;
}
}
或者,您可以使用 @Configuration 注解,如下示例所示:
@Configuration
@EnableIntegration
public class ContextConfiguration {
@Bean
@IntegrationConverter
public SerializingConverter serializingConverter() {
return new SerializingConverter();
}
}
|
在配置应用上下文时,Spring 框架允许您添加一个 相比之下, 然而,如果您确实希望将 Spring
在这种情况下, |
内容类型转换
从版本 5.0 开始,默认情况下,方法调用机制基于 org.springframework.messaging.handler.invocation.InvocableHandlerMethod 基础设施。
其 HandlerMethodArgumentResolver 实现(例如 PayloadArgumentResolver 和 MessageMethodArgumentResolver)可以使用 MessageConverter 抽象将传入的 payload 转换为目标方法参数类型。
转换可以基于 contentType 消息头进行。
为此,Spring Integration 提供了 ConfigurableCompositeMessageConverter,它将委托给已注册的转换器列表进行调用,直到其中一个返回非 null 结果。
默认情况下,此转换器按严格顺序提供:
请参阅 Javadoc(在前面的列表中链接),以了解其用途以及转换时合适的 contentType 值的更多信息。
使用 ConfigurableCompositeMessageConverter 是因为它可以与任何其他 MessageConverter 实现一起提供,包括或不包括前面提到的默认转换器。
它还可以作为适当的 Bean 注册到应用程序上下文中,从而覆盖默认转换器,如下例所示:
@Bean(name = IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME)
public ConfigurableCompositeMessageConverter compositeMessageConverter() {
List<MessageConverter> converters =
Arrays.asList(new MarshallingMessageConverter(jaxb2Marshaller()),
new JavaSerializationMessageConverter());
return new ConfigurableCompositeMessageConverter(converters);
}
这两个新的转换器在默认值之前注册到组合器中。
您也可以不使用 ConfigurableCompositeMessageConverter,而是通过注册一个名称为 integrationArgumentResolverMessageConverter 的 Bean(通过设置 IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME 属性)来提供您自己的 MessageConverter。
在使用 SpEL 方法调用时,不可用基于 MessageConverter 的转换(包括 contentType 个头部)。
在这种情况下,仅可使用上述 有效载荷类型转换中提到的常规类到类的转换。 |
异步轮询
如果您希望轮询是异步的,poller 可以可选地指定一个 task-executor 属性,该属性指向任何现有 TaskExecutor Bean 的实例(Spring 3.0 通过 task 命名空间提供了便捷的配置方式)。
然而,在使用 TaskExecutor 配置 poller 时,您必须理解某些事项。
问题在于存在两个配置:轮询器和 TaskExecutor。
它们必须相互协调。
否则,可能会导致人为的内存泄漏。
考虑以下配置:
<int:channel id="publishChannel">
<int:queue />
</int:channel>
<int:service-activator input-channel="publishChannel" ref="myService">
<int:poller receive-timeout="5000" task-executor="taskExecutor" fixed-rate="50" />
</int:service-activator>
<task:executor id="taskExecutor" pool-size="20" />
上述配置展示了一个未调优的配置。
默认情况下,任务执行器拥有一个无界的任务队列。 即使所有线程都被阻塞(正在等待新消息到达或超时结束),轮询器仍会持续调度新任务。 假设有 20 个线程在执行任务,且超时时间为 5 秒,则任务执行速率为每秒 4 个。 然而,新任务的调度速率为每秒 20 个,因此任务执行器内部的队列在进程空闲时以每秒 16 个的速度增长,从而导致内存泄漏。
处理此问题的方法之一是将任务执行器的queue-capacity属性设置为该值。
即使设置为0也是一个合理的值。
您也可以通过设置任务执行器的rejection-policy属性(例如设置为DISCARD)来指定无法入队的消息应如何处理,从而对其进行管理。
换句话说,在配置TaskExecutor时,您必须了解某些细节。
有关该主题的更多详细信息,请参阅Spring参考手册中的"任务执行与调度"章节。
端点内部 Bean
许多端点是复合 Bean。
这包括所有消费者和所有轮询的入站通道适配器。
消费者(轮询或事件驱动)委托给 MessageHandler。
轮询适配器通过委托给 MessageSource 来获取消息。
通常,获取对委托 Bean 的引用很有用,也许是为了在运行时更改配置或进行测试。
这些 Bean 可以通过已知名称从 ApplicationContext 中获取。
MessageHandler 实例会注册到应用程序上下文中,其 Bean ID 类似于 someConsumer.handler(其中 'consumer' 是端点的 id 属性的值)。
MessageSource 实例会注册为类似的 Bean ID somePolledAdapter.source,其中 'somePolledAdapter' 是适配器的 ID。
上述内容仅适用于框架组件本身。 您可以改用内部 Bean 定义,如下示例所示:
<int:service-activator id="exampleServiceActivator" input-channel="inChannel"
output-channel = "outChannel" method="foo">
<beans:bean class="org.foo.ExampleServiceActivator"/>
</int:service-activator>
该 Bean 被视为任何已声明的内部 Bean,不会注册到应用上下文中。
如果您希望以其他方式访问此 Bean,请在顶层使用 id 声明它,并改用 ref 属性。
有关更多信息,请参阅 Spring 文档。
端点角色
从版本 4.2 开始,端点可以分配给角色。
角色允许将端点作为一个组进行启动和停止。
这在领导权选举场景中特别有用,当获得或撤销领导权时,可以分别启动或停止一组端点。
为此,框架会在应用程序上下文中注册一个名称为 IntegrationContextUtils.INTEGRATION_LIFECYCLE_ROLE_CONTROLLER 的 SmartLifecycleRoleController Bean。
每当需要控制生命周期时,可以注入该 Bean 或使用 @Autowired:
<bean class="com.some.project.SomeLifecycleControl">
<property name="roleController" ref="integrationLifecycleRoleController"/>
</bean>
您可以使用 XML、Java 配置或编程方式为角色分配端点。 以下示例展示了如何使用 XML 配置端点角色:
<int:inbound-channel-adapter id="ica" channel="someChannel" expression="'foo'" role="cluster"
auto-startup="false">
<int:poller fixed-rate="60000" />
</int:inbound-channel-adapter>
以下示例展示了如何为在 Java 中创建的 Bean 配置端点角色:
@Bean
@ServiceActivator(inputChannel = "sendAsyncChannel", autoStartup="false")
@Role("cluster")
public MessageHandler sendAsyncHandler() {
return // some MessageHandler
}
以下示例展示了如何在 Java 中配置方法上的端点角色:
@Payload("#args[0].toLowerCase()")
@Role("cluster")
public String handle(String payload) {
return payload.toUpperCase();
}
以下示例展示了如何在 Java 中使用 SmartLifecycleRoleController 配置端点角色:
@Autowired
private SmartLifecycleRoleController roleController;
...
this.roleController.addSmartLifeCycleToRole("cluster", someEndpoint);
...
以下示例展示了如何在 Java 中使用 IntegrationFlow 来配置端点角色:
IntegrationFlow flow -> flow
.handle(..., e -> e.role("cluster"));
这些操作都会将端点添加到 cluster 角色中。
调用 roleController.startLifecyclesInRole("cluster") 及其对应的 stop… 方法可启动和停止端点。
任何实现了 SmartLifecycle 的对象都可以以编程方式添加——而不仅仅是端点。 |
The SmartLifecycleRoleController 实现了 ApplicationListener<AbstractLeaderEvent>,并在获得或失去领导权时自动启动和停止其配置的 SmartLifecycle 对象(当某个 Bean 发布 OnGrantedEvent 或 OnRevokedEvent 时)。
当使用领导权选举来启动和停止组件时,重要的是将 auto-startup XML 属性(autoStartup bean 属性)设置为 false,以便应用上下文在初始化期间不会启动这些组件。 |
从版本 4.3.8 开始,SmartLifecycleRoleController 提供了一些状态方法:
public Collection<String> getRoles() (1)
public boolean allEndpointsRunning(String role) (2)
public boolean noEndpointsRunning(String role) (3)
public Map<String, Boolean> getEndpointsRunningStatus(String role) (4)
| 1 | 返回正在管理的角色列表。 |
| 2 | 如果角色中的所有端点都在运行,则返回 true。 |
| 3 | 如果角色中的任何端点都未运行,则返回 true。 |
| 4 | 返回一个 component name : running status 的映射。
组件名称通常为 Bean 名称。 |
领导事件处理
端点组可以根据领导权的授予或撤销而启动和停止。这在集群场景中非常有用,因为共享资源必须由单个实例独占消费。一个例子是轮询共享目录的文件入站通道适配器。 (参见 读取文件)。
要参与领导者选举,并在当选领导者、领导权被撤销或未能获取资源成为领导者时收到通知,应用程序需要在应用上下文中创建一个名为“领导者初始化器”的组件。通常,领导者发起者的值为 SmartLifecycle,因此它会在上下文启动时(可选地)启动,并在领导权变更时发布通知。您还可以通过将 publishFailedEvents 设置为 true(从版本 5 开始)来接收失败通知。0),用于在您希望在发生失败时采取特定操作的情况。按照约定,您应提供一个Candidate来接收回调。您也可以通过框架提供的 Context 对象撤销领导权。您的代码也可以监听 o.s.i.leader.event.AbstractLeaderEvent 个实例(OnGrantedEvent 和 OnRevokedEvent 的超类),并做出相应响应(例如,使用 SmartLifecycleRoleController)。事件包含对 Context 对象的引用。以下列表展示了 Context 接口的定义:
public interface Context {
boolean isLeader();
void yield();
String getRole();
}
从 5.0.6 版本开始,上下文提供了对候选人角色的引用。
Spring Integration 提供了基于 LockRegistry 抽象的领导者发起者的基本实现。
要使用它,您需要将其创建为 Bean 实例,如下示例所示:
@Bean
public LockRegistryLeaderInitiator leaderInitiator(LockRegistry locks) {
return new LockRegistryLeaderInitiator(locks);
}
如果锁注册中心实现正确,则任何时候最多只会有一个领导者。
如果锁注册中心还提供会在过期或被破坏时抛出异常(理想情况下为InterruptedException)的锁,那么无领导时期的持续时间可以缩短至锁实现中固有的延迟所允许的最短时间。
默认情况下,busyWaitMillis属性会引入一些额外的延迟,以防止在更常见的锁不完美的情况下出现 CPU 饥饿问题,此时只有当你再次尝试获取锁时才会发现其已过期。
有关使用 Zookeeper 的领导者选举及相关事件的更多信息,请参见 Zookeeper 领导事件处理。 有关使用 Hazelcast 的领导者选举及相关事件的更多信息,请参见 Hazelcast 领导事件处理。
消息网关
网关隐藏了 Spring Integration 提供的消息 API。 它使您的应用程序的业务逻辑无需感知 Spring Integration API。 通过使用通用网关,您的代码只需与一个简单的接口进行交互。
输入GatewayProxyFactoryBean
如前所述,最好完全不依赖 Spring Integration API——包括网关类。
为此,Spring Integration 提供了 GatewayProxyFactoryBean,它可以为任意接口生成代理,并在内部调用如下所示的网关方法。
通过依赖注入,您可以将该接口暴露给您的业务方法。
以下示例展示了可用于与 Spring Integration 进行交互的接口:
package org.cafeteria;
public interface Cafe {
void placeOrder(Order order);
}
网关 XML 命名空间支持
同时也提供了命名空间支持。 它允许将接口配置为服务,如下例所示:
<int:gateway id="cafeService"
service-interface="org.cafeteria.Cafe"
default-request-channel="requestChannel"
default-reply-timeout="10000"
default-reply-channel="replyChannel"/>
使用此配置后,cafeService 现在可以注入到其他 Bean 中,并且调用该 Cafe 接口代理实例上方法的代码对 Spring Integration API 毫无感知。
请参阅“示例”附录,其中包含一个使用 gateway 元素的示例(在 Cafe 演示项目中)。
前述配置中的默认设置将应用于网关接口上的所有方法。 如果未指定回复超时时间,调用线程将等待 30 秒以获取回复。 请参阅 当无响应到达时的网关行为。
默认值可以为各个方法进行覆盖。 请参阅 使用注解和 XML 配置网关。
设置默认回复通道
通常,您无需指定 default-reply-channel,因为网关会自动创建一个临时的匿名回复通道并在此监听回复。
然而,某些情况下您可能需要定义 default-reply-channel(对于适配器网关,如 HTTP、JMS 等,则可能需要定义 reply-channel)。
为了提供背景信息,我们简要讨论一下网关的内部工作原理。
网关会创建一个临时的点对点回复通道。
该通道是匿名的,并以名称 replyChannel 添加到消息头中。
当提供显式的 default-reply-channel(对于远程适配器网关为 reply-channel)时,您可以指向一个发布-订阅通道,之所以这样命名,是因为可以向其中添加多个订阅者。
在内部,Spring Integration 会在临时 replyChannel 和显式定义的 default-reply-channel 之间创建一个桥接。
假设您希望您的回复不仅发送到网关,还要发送到其他某些消费者。 在这种情况下,您需要两件事:
-
一个可订阅的命名通道
-
该通道应成为发布 - 订阅通道
网关使用的默认策略无法满足这些需求,因为添加到头部的回复通道是匿名的且为点对点模式。
这意味着没有其他订阅者能够获取对该通道的引用,即使可以,由于该通道具有点对点行为,也只有一个订阅者能收到消息。
通过定义 default-reply-channel,您可以指向您选择的任意通道。
在这种情况下,它就是一个 publish-subscribe-channel。
网关会创建一个桥接,将其连接到存储在头部中的临时匿名回复通道。
您可能还需要显式提供一个回复通道,以便通过拦截器进行监控或审计(例如,线束监听)。 要配置通道拦截器,您需要一个命名通道。
从版本 5.4 开始,当网关方法的返回类型为 void 时,如果未显式提供该标头,框架将把 replyChannel 标头填充为 nullChannel Bean 引用。
这允许丢弃下游流程中任何可能的回复,从而满足单向网关契约的要求。 |
使用注解和 XML 配置网关
考虑以下示例,它通过添加 @Gateway 注解扩展了之前的 Cafe 接口示例:
public interface Cafe {
@Gateway(requestChannel="orders")
void placeOrder(Order order);
}
@Header 注解允许添加被解释为消息头的值,如下示例所示:
public interface FileWriter {
@Gateway(requestChannel="filesOut")
void write(byte[] content, @Header(FileHeaders.FILENAME) String filename);
}
如果您更喜欢使用 XML 方式配置网关方法,可以向网关配置中添加 method 元素,如下示例所示:
<int:gateway id="myGateway" service-interface="org.foo.bar.TestGateway"
default-request-channel="inputC">
<int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
<int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
<int:method name="echoUpperCase" request-channel="inputB"/>
<int:method name="echoViaDefault"/>
</int:gateway>
您还可以使用 XML 为每个方法调用提供单独的头部信息。
如果您希望设置的头部信息是静态的,并且不想通过@Header注解将它们嵌入到网关的方法签名中,这种方法将非常有用。
例如,在贷款经纪人示例中,我们希望能够根据发起的请求类型(单一报价或所有报价)来影响贷款报价的聚合方式。
虽然可以通过评估调用了哪个网关方法来确定请求类型,但这会违反关注点分离原则(因为该方法是一个 Java 工件)。
然而,在消息头中表达您的意图(元信息)在消息架构中是非常自然的。
以下示例展示了如何为两个方法中的每一个添加不同的消息头:
<int:gateway id="loanBrokerGateway"
service-interface="org.springframework.integration.loanbroker.LoanBrokerGateway">
<int:method name="getLoanQuote" request-channel="loanBrokerPreProcessingChannel">
<int:header name="RESPONSE_TYPE" value="BEST"/>
</int:method>
<int:method name="getAllLoanQuotes" request-channel="loanBrokerPreProcessingChannel">
<int:header name="RESPONSE_TYPE" value="ALL"/>
</int:method>
</int:gateway>
在前面的示例中,根据网关的方法,为'RESPONSE_TYPE'头设置了不同的值。
如果您在 <int:method/> 和 @Gateway 注解中指定了例如 requestChannel,则注解值优先。 |
如果在 XML 中指定了无参数网关,且接口方法同时包含 @Payload 和 @Gateway 注解(在 <int:method/> 元素中包含 payloadExpression 或 payload-expression),则 @Payload 值将被忽略。 |
表达式与“全局”标头
<header/> 元素支持将 expression 作为 value 的替代方案。
SpEL 表达式将被求值以确定请求头的值。
从版本 5.2 开始,评估上下文中的 #root 对象是一个带有 getMethod() 和 getArgs() 访问器的 MethodArgsHolder。
例如,如果您希望根据简单的方法名进行路由,可以添加一个包含以下表达式的请求头:method.name。
java.reflect.Method 不可序列化。
如果稍后对消息进行序列化,则带有 method 表达式的标头将丢失。
因此,在这些情况下,您可能希望使用 method.name 或 method.toString()。
toString() 方法提供该方法的 String 表示形式,包括参数和返回类型。 |
自 3.0 版本起,可以定义 <default-header/> 元素,以便为网关生成的所有消息添加标头,而不论调用了何种方法。
为特定方法定义的标头优先于默认标头。
此处为特定方法定义的标头将覆盖服务接口中的任何 @Header 注解。
但是,默认标头不会覆盖服务接口中的任何 @Header 注解。
网关现在也支持 default-payload-expression,该值适用于所有方法(除非被覆盖)。
将映射方法参数到消息
使用上一节中的配置技术可以控制方法参数如何映射到消息元素(有效负载和标头)。 当未使用显式配置时,将采用某些约定来执行映射。 在某些情况下,这些约定无法确定哪个参数是有效负载,哪个应映射到标头。 请考虑以下示例:
public String send1(Object thing1, Map thing2);
public String send2(Map thing1, Map thing2);
在第一种情况下,约定是将第一个参数映射到有效载荷(只要它不是 Map),第二个参数的内容变为标头。
在第二种情况(或当参数 thing1 的实参为 Map 时的第一种情况)下,框架无法确定哪个实参应为负载。
因此,映射失败。
通常可以通过使用 payload-expression、@Payload 注解或 @Headers 注解来解决此问题。
或者(当约定失效时),您可以完全负责将方法调用映射到消息。
为此,请实现一个 MethodArgsMessageMapper,并通过使用 mapper 属性将其提供给 <gateway/>。
该映射器映射一个 MethodArgsHolder,这是一个简单的类,用于包装 java.reflect.Method 实例和包含参数的 Object[]。
当提供自定义映射器时,网关上不允许使用 default-payload-expression 属性和 <default-header/> 元素。
同样,任何 <method/> 元素上也不允许使用 payload-expression 属性和 <header/> 元素。
映射方法参数
以下示例展示了如何将方法参数映射到消息,并列举了一些无效配置的示例:
public interface MyGateway {
void payloadAndHeaderMapWithoutAnnotations(String s, Map<String, Object> map);
void payloadAndHeaderMapWithAnnotations(@Payload String s, @Headers Map<String, Object> map);
void headerValuesAndPayloadWithAnnotations(@Header("k1") String x, @Payload String s, @Header("k2") String y);
void mapOnly(Map<String, Object> map); // the payload is the map and no custom headers are added
void twoMapsAndOneAnnotatedWithPayload(@Payload Map<String, Object> payload, Map<String, Object> headers);
@Payload("args[0] + args[1] + '!'")
void payloadAnnotationAtMethodLevel(String a, String b);
@Payload("@someBean.exclaim(args[0])")
void payloadAnnotationAtMethodLevelUsingBeanResolver(String s);
void payloadAnnotationWithExpression(@Payload("toUpperCase()") String s);
void payloadAnnotationWithExpressionUsingBeanResolver(@Payload("@someBean.sum(#this)") String s); // (1)
// invalid
void twoMapsWithoutAnnotations(Map<String, Object> m1, Map<String, Object> m2);
// invalid
void twoPayloads(@Payload String s1, @Payload String s2);
// invalid
void payloadAndHeaderAnnotationsOnSameParameter(@Payload @Header("x") String s);
// invalid
void payloadAndHeadersAnnotationsOnSameParameter(@Payload @Headers Map<String, Object> map);
}
| 1 | 请注意,在此示例中,SpEL 变量 #this 引用的是参数——在本例中为值 s。 |
XML 等效形式看起来略有不同,因为方法参数没有 #this 上下文。
然而,表达式可以通过使用 args 属性引用 MethodArgsHolder 根对象的方法参数(更多信息请参阅 表达式和“全局”标头),如下示例所示:
<int:gateway id="myGateway" service-interface="org.something.MyGateway">
<int:method name="send1" payload-expression="args[0] + 'thing2'"/>
<int:method name="send2" payload-expression="@someBean.sum(args[0])"/>
<int:method name="send3" payload-expression="method"/>
<int:method name="send4">
<int:header name="thing1" expression="args[2].toUpperCase()"/>
</int:method>
</int:gateway>
@MessagingGateway注解
从版本 4.0 开始,网关服务接口可以使用 @MessagingGateway 注解进行标记,而无需定义 <gateway /> XML 元素来进行配置。
以下一对示例比较了配置同一网关的两种方法:
<int:gateway id="myGateway" service-interface="org.something.TestGateway"
default-request-channel="inputC">
<int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
<int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
<int:method name="echoUpperCase" request-channel="inputB">
<int:header name="thing1" value="thing2"/>
</int:method>
<int:method name="echoViaDefault"/>
</int:gateway>
@MessagingGateway(name = "myGateway", defaultRequestChannel = "inputC",
defaultHeaders = @GatewayHeader(name = "calledMethod",
expression="#gatewayMethod.name"))
public interface TestGateway {
@Gateway(requestChannel = "inputA", replyTimeout = 2, requestTimeout = 200)
String echo(String payload);
@Gateway(requestChannel = "inputB", headers = @GatewayHeader(name = "thing1", value="thing2"))
String echoUpperCase(String payload);
String echoViaDefault(String payload);
}
与 XML 版本类似,当 Spring Integration 在组件扫描期间发现这些注解时,它会创建带有其消息基础设施的 proxy 实现。
为了执行此扫描并将 BeanDefinition 注册到应用程序上下文中,请在 @Configuration 类上添加 @IntegrationComponentScan 注解。
标准的 @ComponentScan 基础设施不处理接口。
因此,我们引入了自定义的 @IntegrationComponentScan 逻辑来查找接口上的 @MessagingGateway 注解,并为它们注册 GatewayProxyFactoryBean 实例。
另请参阅 注解支持。 |
随着 @MessagingGateway 注解的使用,您可以使用 @Profile 注解标记服务接口,以避免在特定配置文件未激活时创建 Bean。
从版本 6.0 开始,带有 @MessagingGateway 的接口也可以像任何 Spring @Component 定义一样,使用 @Primary 注解来标记其相应的配置逻辑。
从 6.0 版本开始,标准 Spring @Import 配置中可以使用 @MessagingGateway 接口。
这可以作为 @IntegrationComponentScan 或手动 AnnotationGatewayProxyFactoryBean Bean 定义的替代方案。
自版本 6.0 起,@MessagingGateway 使用 @MessageEndpoint 进行元注解,且 name() 属性本质上被别名为 @Compnent.value()。
通过这种方式,网关代理的 Bean 名称生成策略与扫描和导入组件的标准 Spring 注解配置保持一致。
默认 AnnotationBeanNameGenerator 可通过 AnnotationConfigUtils.CONFIGURATION_BEAN_NAME_GENERATOR 全局覆盖,或作为 @IntegrationComponentScan.nameGenerator() 属性进行设置。
如果您没有 XML 配置,则至少在一个 @EnableIntegration 类上需要 @Configuration 注解。
有关更多信息,请参阅 配置和@EnableIntegration。 |
调用无参数方法
当在没有任何参数的 Gateway 接口上调用方法时,默认行为是从 PollableChannel 接收 Message。
然而,有时您可能希望触发无参数方法,以便与下游不需要用户提供参数的其他组件进行交互,例如触发无参数的 SQL 调用或存储过程。
要实现发送和接收语义,您必须提供有效负载。
生成有效负载时,接口上的方法参数不是必需的。
您可以使用 @Payload 注解,或在 XML 中的 method 元素上使用 payload-expression 属性。
以下列表包含一些可能的有效负载示例:
-
一个文字字符串
-
#gatewayMethod.name
-
new java.util.Date()
-
@someBean.someMethod() 的返回值
以下示例展示了如何使用 @Payload 注解:
public interface Cafe {
@Payload("new java.util.Date()")
List<Order> retrieveOpenOrders();
}
您也可以使用 @Gateway 注解。
public interface Cafe {
@Gateway(payloadExpression = "new java.util.Date()")
List<Order> retrieveOpenOrders();
}
如果两个注解都存在(并且提供了 payloadExpression),则 @Gateway 生效。 |
另请参阅 使用注解和 XML 配置网关。
如果方法没有参数且没有返回值,但包含负载表达式,则被视为仅发送操作。
调用default方法
网关代理的接口也可以有 default 个方法,并且从 5.3 版本开始,框架会将 DefaultMethodInvokingMethodInterceptor 注入到代理中,以便使用 default 方法调用方式直接调用 java.lang.invoke.MethodHandle 方法,而不是通过代理。
来自 JDK 的接口(例如 java.util.function.Function)仍然可以用于网关代理,但由于针对 JDK 类进行 MethodHandles.Lookup 实例化的内部 Java 安全限制,它们的 default 方法无法被调用。
这些方法也可以通过在方法上显式添加 @Gateway 注解,或在 @MessagingGateway 注解或 <gateway> XML 组件上添加 proxyDefaultMethods 来启用代理(这将丢失其实现逻辑,同时恢复之前的网关代理行为)。
错误处理
网关调用可能导致错误。 默认情况下,下游发生的任何错误都会在网关方法调用时“原样”重新抛出。 例如,考虑以下简单流程:
gateway -> service-activator
如果服务激活器调用的服务抛出 MyException(例如),框架会将其包装在 MessagingException 中,并将传递给服务激活器的消息附加到 failedMessage 属性中。
因此,框架执行的任何日志记录都具有完整的失败上下文。
默认情况下,当网关捕获异常时,MyException 会被解包并抛给调用者。
您可以在网关方法声明上配置 throws 子句,以匹配原因链中的特定异常类型。
例如,如果您想捕获整个 MessagingException 以及下游错误原因的所有消息信息,您的网关方法应类似于以下内容:
public interface MyGateway {
void performProcess() throws MessagingException;
}
由于我们鼓励 POJO 编程,您可能不希望向调用者暴露消息基础设施。
如果您的网关方法没有 throws 子句,网关将遍历原因树,寻找一个不是 MessagingException 的 RuntimeException。
如果未找到任何匹配项,框架将抛出 MessagingException。
如果前述讨论中的 MyException 的原因包含 SomeOtherException,且您的方法为 throws SomeOtherException,网关将进一步解包该异常并将其抛给调用者。
当声明网关时未指定 service-interface,将使用内部框架接口 RequestReplyExchanger。
考虑以下示例:
public interface RequestReplyExchanger {
Message<?> exchange(Message<?> request) throws MessagingException;
}
在 5.0 版本之前,此 exchange 方法没有 throws 子句,因此异常会被解包。
如果您使用此接口并希望恢复之前的解包行为,请使用自定义的 service-interface,或自行访问 MessagingException 的 cause。
然而,您可能希望记录错误而不是传播它,或者您可能希望将异常视为有效回复(通过将其映射为符合调用方理解的某些“错误消息”契约的消息)。
为了实现这一点,网关通过支持 error-channel 属性提供了对专用错误消息通道的支持。
在以下示例中,'转换器' 从 Exception 创建了一个回复 Message:
<int:gateway id="sampleGateway"
default-request-channel="gatewayChannel"
service-interface="foo.bar.SimpleGateway"
error-channel="exceptionTransformationChannel"/>
<int:transformer input-channel="exceptionTransformationChannel"
ref="exceptionTransformer" method="createErrorResponse"/>
exceptionTransformer 可以是一个简单的 POJO,它知道如何创建预期的错误响应对象。
该对象将成为返回给调用方的有效载荷。
如有必要,您可以在这种“错误流”中执行更多复杂的操作。
这可能涉及路由器(包括 Spring Integration 的 ErrorMessageExceptionTypeRouter)、过滤器等。
然而,大多数情况下,一个简单的“转换器”就足够了。
或者,您可能只想记录异常(或将其异步发送到某处)。
如果您提供单向流,则不会向调用者发送任何内容。
如果您想完全抑制异常,可以提供对全局 nullChannel 的引用(本质上是一种 /dev/null 方法)。
最后,如上所述,如果未定义 error-channel,则异常将按通常方式传播。
当您使用 @MessagingGateway 注解(参见 )时,可以使用 @MessagingGateway AnnotationerrorChannel 属性。
从版本 5.0 开始,当您使用返回类型为 void(单向流)的网关方法时,error-channel 引用(如果提供)将被填充到每条发送消息的标准 errorChannel 头中。
此功能允许基于标准 ExecutorChannel 配置(或 QueueChannel)的下游异步流覆盖默认的全局 errorChannel 异常发送行为。
以前,您必须使用 @GatewayHeader 注解或 <header> 元素手动指定 errorChannel 头。
对于具有异步流的 void 方法,error-channel 属性会被忽略。
相反,错误消息会被发送到默认的 errorChannel。
通过简单的 POJO 网关暴露消息系统具有诸多优势,但“隐藏”底层消息系统的实际实现确实会带来一定的代价,因此您需要考虑一些事项。我们希望我们的 Java 方法尽快返回,而不会在调用者等待其返回(无论是 void、返回值还是抛出的异常)时无限期挂起。当在消息系统前端使用常规方法作为代理时,我们必须考虑底层消息传递潜在的异步特性。这意味着,由网关发起的消息可能会被过滤器丢弃,且永远无法到达负责生成回复的组件。某些服务激活器方法可能会导致异常,从而不提供回复(因为我们不生成空消息)。换句话说,多种情况可能导致回复消息永远无法到达。在消息系统中,这是完全正常的。然而,请考虑其对网关方法的影响。网关的方法输入参数已被纳入消息并发送至下游。回复消息将被转换为网关方法的返回值。因此,您可能需要确保对于每个网关调用,始终有一条回复消息。否则,如果将 reply-timeout 设置为负值,您的网关方法可能永远不会返回并无限期挂起。处理这种情况的一种方法是使用异步网关(本节稍后将对此进行解释)。另一种处理方式是将默认 reply-timeout 视为 30 秒。这样,网关就不会再挂起超过reply-timeout所指定的时间,如果超时发生,则返回“null”。最后,您可能需要考虑设置下游标志,例如在 service-activator 上设置 'requires-reply',或在 filter 上设置 'throw-exceptions-on-rejection'。这些选项将在本章最后一节中进行更详细的讨论。
|
如果下游流返回一个ErrorMessage,则其payload(即Throwable)将被视为常规下游错误。如果配置了 error-channel,它将被发送到错误流。否则,负载将被抛给网关的调用者。同样,如果 error-channel 上的错误流返回 ErrorMessage,其有效负载将被抛出给调用者。任何带有 Throwable 有效载荷的消息也同样适用。这在需要直接将 Exception 传播给调用方的异步场景中非常有用。为此,您可以选择返回一个 Exception(例如从某个服务返回的 reply),或者抛出异常。通常,即使采用异步流程,框架也会负责将下游流程抛出的异常传播回网关。TCP 客户端 - 服务器多路复用 示例演示了将异常返回给调用者的两种技术。它通过使用 aggregator 和 group-timeout 来模拟等待线程的 Socket IO 错误(参见 聚合器和组超时),并在丢弃流上返回 MessagingTimeoutException 响应。
|
网关超时
网关有两个超时属性:requestTimeout 和 replyTimeout。
请求超时仅当通道可以阻塞时适用(例如,一个已满的有界 QueueChannel)。
replyTimeout 值表示网关等待回复的时长,若超过该时间则返回 null。
其默认值为无穷大。
超时时间可以设置为网关上所有方法的默认值(defaultRequestTimeout 和 defaultReplyTimeout),或者在 MessagingGateway 接口注解上设置。
各个方法可以覆盖这些默认值(在 <method/> 子元素中)或在 @Gateway 注解上设置。
从 5.0 版本开始,超时时间可以定义为表达式,如下例所示:
@Gateway(payloadExpression = "args[0]", requestChannel = "someChannel",
requestTimeoutExpression = "args[1]", replyTimeoutExpression = "args[2]")
String lateReply(String payload, long requestTimeout, long replyTimeout);
评估上下文包含一个 BeanResolver(使用 @someBean 引用其他 Bean),并且来自 #root 对象的 args 数组属性可用。
有关此根对象的更多信息,请参阅 表达式和“全局”标头。
在使用 XML 配置时,timeout 属性可以是 long 值或 SpEL 表达式,如下示例所示:
<method name="someMethod" request-channel="someRequestChannel"
payload-expression="args[0]"
request-timeout="1000"
reply-timeout="args[1]">
</method>
异步网关
作为一种设计模式,消息网关提供了一种优雅的方式,在隐藏特定于消息的代码的同时,仍能暴露消息系统的完整功能。正如前面所述,GatewayProxyFactoryBean提供了一种便捷的方式,通过服务接口公开代理,使您能够基于 POJO 访问消息系统(基于您自己域中的对象、原始类型/字符串或其他对象)。然而,当网关通过返回值的简单 POJO 方法公开时,这意味着对于每个请求消息(在调用方法时生成),必须有一个回复消息(在方法返回时生成)。由于消息系统本质上是异步的,您可能无法始终保证“每个请求都会有回复”这一契约。Spring Integration 2.0 引入了对异步网关的支持,提供了一种便捷的方式来启动流程,即使您不确定是否期望回复或需要多长时间才能收到回复。
为了处理此类场景,Spring Integration 使用 java.util.concurrent.Future 个实例来支持异步网关。
从 XML 配置来看,没有任何变化,您定义异步网关的方式与定义普通网关相同,如下示例所示:
<int:gateway id="mathService"
service-interface="org.springframework.integration.sample.gateway.futures.MathServiceGateway"
default-request-channel="requestChannel"/>
然而,网关接口(服务接口)略有不同,如下所示:
public interface MathServiceGateway {
Future<Integer> multiplyByTwo(int i);
}
正如前面的示例所示,网关方法的返回类型是 Future。
当 GatewayProxyFactoryBean 检测到网关方法的返回类型为 Future 时,它会立即通过使用 AsyncTaskExecutor 切换到异步模式。
这就是两者之间的全部差异。
调用此类方法总是立即返回一个 Future 实例。
然后,您可以按照自己的节奏与 Future 进行交互,以获取结果、取消操作等。
此外,与使用任何其他 Future 实例一样,调用 get() 可能会揭示超时、执行异常等问题。
以下示例展示了如何使用从异步网关返回的 Future:
MathServiceGateway mathService = ac.getBean("mathService", MathServiceGateway.class);
Future<Integer> result = mathService.multiplyByTwo(number);
// do something else here since the reply might take a moment
int finalResult = result.get(1000, TimeUnit.SECONDS);
有关更详细的示例,请参阅 Spring Integration 示例中的 async-gateway 样本。
AsyncTaskExecutor
默认情况下,当提交内部 AsyncInvocationTask 实例以调用任何返回类型为 Future 的网关方法时,GatewayProxyFactoryBean 会使用 org.springframework.core.task.SimpleAsyncTaskExecutor。
但是,<gateway/> 元素配置中的 async-executor 属性允许您引用 Spring 应用程序上下文中可用的任何 java.util.concurrent.Executor 实现。
默认的 SimpleAsyncTaskExecutor 同时支持 Future 和 CompletableFuture 返回类型。
请参阅 CompletableFuture。
尽管存在默认的执行器,但通常提供外部执行器也很有用,这样您可以在日志中识别其线程(在使用 XML 时,线程名称基于执行器的 bean 名称),如下示例所示:
@Bean
public AsyncTaskExecutor exec() {
SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
simpleAsyncTaskExecutor.setThreadNamePrefix("exec-");
return simpleAsyncTaskExecutor;
}
@MessagingGateway(asyncExecutor = "exec")
public interface ExecGateway {
@Gateway(requestChannel = "gatewayChannel")
Future<?> doAsync(String foo);
}
如果您希望返回不同的 Future 实现,您可以提供一个自定义执行器,或者完全禁用执行器,并在下游流程的回复消息负载中返回 Future。
要禁用执行器,请在 GatewayProxyFactoryBean 中将其设置为 null(通过使用 setAsyncTaskExecutor(null))。
使用 XML 配置网关时,请使用 async-executor=""。
使用 @MessagingGateway 注解进行配置时,请使用类似于以下代码:
@MessagingGateway(asyncExecutor = AnnotationConstants.NULL)
public interface NoExecGateway {
@Gateway(requestChannel = "gatewayChannel")
Future<?> doAsync(String foo);
}
如果返回类型是特定的具体 Future 实现,或者是配置的执行器不支持的某些其他子接口,则流程将在调用者的线程上运行,并且流程必须在回复消息负载中返回所需的类型。 |
CompletableFuture
从版本 4.2 开始,网关方法现在可以返回 CompletableFuture<?>。
在返回此类内容时有两种操作模式:
-
当提供异步执行器且返回类型恰好为
CompletableFuture(不是子类)时,框架会在执行器上运行任务,并立即向调用者返回CompletableFuture。CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor)用于创建 Future。 -
当异步执行器被显式设置为
null,且返回类型为CompletableFuture或返回类型是CompletableFuture的子类时,流程将在调用者的线程上执行。 在此场景下,下游流程预期返回适当类型的CompletableFuture。
The org.springframework.util.concurrent.ListenableFuture has been deprecated starting with Spring Framework 6.0.
It is recommended now to migrate to the CompletableFuture which provides similar processing functionality. |
使用场景
在以下场景中,调用线程会立即返回 CompletableFuture<Invoice>,该值在下游流程向网关回复(携带一个 Invoice 对象)时完成。
CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="something.Service" default-request-channel="orders" />
在以下场景中,当下游流将 CompletableFuture<Invoice> 作为回复的负载提供给网关时,调用线程返回该值。
发票准备就绪后,必须由其他进程完成该 Future。
CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders"
async-executor="" />
在以下场景中,当下游流将其作为回复的负载提供给网关时,调用线程返回 CompletableFuture<Invoice>。
当发票准备好时,必须由其他进程完成该未来对象。
如果启用了 DEBUG 日志记录,则会输出一条日志条目,表明异步执行器无法用于此场景。
MyCompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders" />
CompletableFuture 个实例可用于对回复执行额外的操作,如下例所示:
CompletableFuture<String> process(String data);
...
CompletableFuture result = process("foo")
.thenApply(t -> t.toUpperCase());
...
String out = result.get(10, TimeUnit.SECONDS);
ReactorMono
从版本 5.0 开始,GatewayProxyFactoryBean 允许在网关接口方法中使用 Project Reactor,并使用 Mono<T> 返回类型。
内部 AsyncInvocationTask 被封装在 Mono.fromCallable() 中。
可以使用 Mono 稍后获取结果(类似于使用 Future<?>),或者在结果返回到网关时,通过调用您的 Consumer 由调度器直接消费该结果。
Mono 不会立即被框架刷新。
因此,在网关方法返回之前(如 Future<?> Executor 任务那样),底层消息流尚未启动。
当订阅到 Mono 时,该流程才会启动。
或者,Mono(作为一个“可组合”项)可能是 Reactor 流的一部分,而 subscribe() 则与整个 Flux 相关。
以下示例展示了如何使用 Project Reactor 创建网关: |
@MessagingGateway
public interface TestGateway {
@Gateway(requestChannel = "multiplyChannel")
Mono<Integer> multiply(Integer value);
}
@ServiceActivator(inputChannel = "multiplyChannel")
public Integer multiply(Integer value) {
return value * 2;
}
可以在处理数据Flux的某些服务中使用的网关是:
@Autowired
TestGateway testGateway;
public void hadnleFlux() {
Flux.just("1", "2", "3", "4", "5")
.map(Integer::parseInt)
.flatMap(this.testGateway::multiply)
.collectList()
.subscribe(System.out::println);
}
另一个使用 Project Reactor 的示例是一个简单的回调场景,如下例所示:
Mono<Invoice> mono = service.process(myOrder);
mono.subscribe(invoice -> handleInvoice(invoice));
调用线程继续执行,当流程完成时将调用handleInvoice()。
另请参阅 Kotlin 协程 以获取更多信息。
返回异步类型的下游流程
如上所述,在 AsyncTaskExecutor 部分中,如果您希望某个下游组件返回带有异步负载的消息(Future、Mono 等),则必须显式将异步执行器设置为 null(若使用 XML 配置则设置为 "")。
随后,该流程将在调用者线程上被调用,结果可稍后获取。
异步void返回类型
消息网关方法可以这样声明:
@MessagingGateway
public interface MyGateway {
@Gateway(requestChannel = "sendAsyncChannel")
@Async
void sendAsync(String payload);
}
但是下游异常不会被传播回调用者。
为了确保下游流调用和异常传播到调用者的异步行为,从 6.0 版本开始,该框架支持 Future<Void> 和 Mono<Void> 返回类型。
使用场景类似于之前描述的普通 void 返回类型的“发送即忘”行为,但区别在于流执行是异步进行的,并且返回的 Future(或 Mono)会根据 send 操作的结果以 null 完成或以异常方式结束。
如果 Future<Void> 是精确的下游流回复,则网关的 asyncExecutor 选项必须设置为 null(对于 @MessagingGateway 配置为 AnnotationConstants.NULL),并且 send 部分将在生产者线程上执行。
该回复取决于下游流的配置。
这样,目标应用程序就有责任正确生成 Future<Void> 回复。
Mono 用例已经超出框架的线程控制范围,因此将 asyncExecutor 设置为 null 没有意义。
请求 - 回复网关操作的结果 Mono<Void> 必须配置为网关方法的 Mono<?> 返回类型。 |
当没有响应到达时网关的行为
如前面所述,网关提供了一种通过 POJO 方法调用来与消息系统进行交互的便捷方式。 然而,典型的调用(通常期望始终返回,即使发生异常)可能并不总是与消息交换一一对应(例如,回复消息可能未到达——相当于方法未返回)。
本节其余部分涵盖各种场景以及如何使网关的行为更具可预测性。
某些属性可以配置以使同步网关的行为更加可预测,但它们可能并不总是按您预期的那样工作。
其中之一是 reply-timeout(在方法级别)或 default-reply-timeout(在网关级别)。
我们检查 reply-timeout 属性,以了解它如何在各种场景中影响同步网关的行为,以及它无法影响的方面。
我们考察单线程场景(所有下游组件都通过直接通道连接)和多线程场景(例如,在某个下游位置您可能拥有可轮询的通道或执行器通道,从而打破单线程边界)。
长时间运行的下游进程
- 同步网关,单线程
-
如果下游组件仍在运行(可能是因为无限循环或慢速服务),设置
reply-timeout将不起作用,网关方法调用不会返回,直到下游服务退出(通过返回或抛出异常)。 - Sync Gateway, 多线程
-
如果下游组件仍在运行(可能是由于无限循环或缓慢的服务),在多线程消息流中,设置
reply-timeout会产生效果,因为它允许网关方法调用在超时到达后返回,因为GatewayProxyFactoryBean会在回复通道上进行轮询,等待消息直到超时到期。 然而,如果在实际回复生成之前就已达到超时时间,则可能导致网关方法返回'null'。 您应该理解,回复消息(如果已生成)可能在网关方法调用返回之后才被发送到回复通道,因此您必须意识到这一点,并在设计流程时予以考虑。
下游组件返回 'null'
- 同步网关 — 单线程
-
如果下游组件返回 'null',且
reply-timeout被配置为负值,则网关方法调用将无限期挂起,除非在可能返回 'null' 的下游组件(例如服务激活器)上设置了requires-reply属性。 在这种情况下,将抛出异常并将其传播到网关。 - Sync Gateway — 多线程
-
该行为与之前的情况相同。
下游组件的返回签名是'void',而网关方法的签名是非'void'
- 同步网关 — 单线程
-
如果下游组件返回 'void',且
reply-timeout被配置为负值,则网关方法调用将无限期挂起。 - Sync Gateway — 多线程
-
该行为与之前的情况相同。
下游组件导致运行时异常
- 同步网关 — 单线程
-
如果下游组件抛出运行时异常,该异常会通过错误消息传播回网关并重新抛出。
- Sync Gateway — 多线程
-
该行为与之前的情况相同。
您应该了解,默认情况下,reply-timeout 表示无限制。因此,如果您将 reply-timeout 设置为负值,您的网关方法调用可能会无限期挂起。因此,为了确保您分析自己的流程,并且即使存在这些场景发生的微小可能性,您也应将 reply-timeout 属性设置为一个“安全”的值。默认时间为 30 秒。更好的是,您可以将下游组件的 requires-reply 属性设置为'true',以确保及时响应,就像在该下游组件内部返回 null 时立即抛出异常所产生的效果一样。然而,您也应该意识到存在一些场景(参见 第一个场景),其中 reply-timeout 无法提供帮助。这意味着分析您的消息流并决定何时使用同步网关而不是异步网关也同样重要。如前文所述,后一种情况是定义返回Future实例的网关方法的问题。这样您就能确保收到该返回值,并且可以对调用的结果进行更细粒度的控制。此外,在处理路由器时,您应该记住,将 resolution-required 属性设置为 'true' 会导致路由器在无法解析特定通道时抛出异常。同样,在处理 Filter 时,您可以设置 throw-exception-on-rejection 属性。在这两种情况下,生成的流程行为都类似于包含一个具有'requires-reply'属性的服务激活器。换句话说,它有助于确保网关方法调用得到及时的响应。
|
| 您应该了解,计时器在线程返回到网关时启动——即当流程完成或将消息移交到另一个线程时。 此时,调用线程开始等待回复。 如果流程完全是同步的,回复将立即可用。 对于异步流程,线程最多等待此时间。 |
在 Java DSL 章节中查看 IntegrationFlow 作为网关,以了解通过 IntegrationFlow 定义网关的选项。
服务激活器
服务激活器是一种端点类型,用于将任何由 Spring 管理的对象连接到输入通道,使其能够扮演服务的角色。
如果服务产生输出,它也可以连接到输出通道。
或者,一个产生输出的服务可以位于处理管道或消息流的末尾,在这种情况下,可以使用传入消息的 replyChannel 标头。
如果没有定义输出通道,这是默认行为。
与这里描述的许多配置选项一样,同样的行为实际上也适用于大多数其他组件。
服务激活器本质上是一个通用端点,用于使用输入消息(有效载荷和标头)调用某个对象上的方法。
其内部逻辑基于 MessageHandler,它可以是针对特定用例的任何可能实现,例如 DefaultMessageSplitter、AggregatingMessageHandler、SftpMessageHandler、JpaOutboundGateway 等。
因此,本参考手册中提到的任何出站网关和出站通道适配器都应被视为此服务激活器端点的特定扩展;它们最终都会调用某个对象的方法。
配置服务激活器
使用 Java 和注解配置时,只需将相应的服务方法标记为 @ServiceActivator 注解——当从输入通道消费消息时,框架会自动调用该方法:
public class SomeService {
@ServiceActivator(inputChannel = "exampleChannel")
public void exampleHandler(SomeData payload) {
...
}
}
有关更多信息,请参阅 注解支持。
对于 Java、Groovy 或 Kotlin DSL,.handle() 操作符的 IntegrationFlow 表示服务激活器:
@Bean
public IntegrationFlow someFlow() {
return IntegrationFlow
.from("exampleChannel")
.handle(someService, "exampleHandler")
.get();
}
@Bean
fun someFlow() =
integrationFlow("exampleChannel") {
handle(someService, "exampleHandler")
}
@Bean
someFlow() {
integrationFlow 'exampleChannel',
{
handle someService, 'exampleHandler'
}
}
查看有关 DSL 的更多信息,请参见相应的章节:
使用 XML 配置创建服务激活器时,请使用带有 'input-channel' 和 'ref' 属性的 'service-activator' 元素,如下示例所示:
<int:service-activator input-channel="exampleChannel" ref="exampleHandler"/>
上述配置从 exampleHandler 中选择所有满足以下消息要求的方法:
-
使用
@ServiceActivator注解 -
is
public -
如果
requiresReply == true,则不返回void
运行时调用的目标方法会根据每个请求消息的 payload 类型进行选择;如果目标类中存在该方法,则作为回退机制使用 Message<?> 类型。
从版本 5.0 开始,一个服务方法可以使用 @org.springframework.integration.annotation.Default 标记为所有不匹配情况的回退方案。
在使用 内容类型转换 时,这非常有用,因为目标方法将在转换后被调用。
要委托给任何对象的显式定义方法,您可以添加 method 属性,如下示例所示:
<int:service-activator input-channel="exampleChannel" ref="somePojo" method="someMethod"/>
在任一情况下,当服务方法返回非 null 值时,端点会尝试将回复消息发送到合适的回复通道。
为了确定回复通道,它首先检查是否在端点配置中提供了 output-channel,如下示例所示:
<int:service-activator input-channel="exampleChannel" output-channel="replyChannel"
ref="somePojo" method="someMethod"/>
如果方法返回结果且未定义 output-channel,框架将检查请求消息的 replyChannel 头值。
如果该值存在,则检查其类型。
如果是 MessageChannel,回复消息将发送至该通道。
如果是 String,端点会尝试将通道名称解析为通道实例。
如果无法解析通道,将抛出 DestinationResolutionException。
如果可以解析,消息将被发送至此。
如果请求消息没有 replyChannel 头,且 reply 对象是 Message,则查阅其 replyChannel 头以获取目标目的地。
这是 Spring Integration 中用于请求 - 回复消息传递的技术,也是返回地址模式的一个示例。
如果您的方法返回一个结果,而您希望丢弃它并结束流程,您应该配置将 output-channel 发送到 NullChannel。
为了方便起见,框架已注册了一个名为 nullChannel 的组件。
有关更多信息,请参阅 特殊通道。
服务激活器是那些不需要产生回复消息的组件之一。
如果您的方法返回 null 或具有 void 返回类型,服务激活器将在方法调用后退出,不发出任何信号。
此行为可通过 AbstractReplyProducingMessageHandler.requiresReply 选项进行控制,该选项在使用 XML 命名空间配置时也暴露为 requires-reply。
如果标志设置为 true 且方法返回 null,则会抛出 ReplyRequiredException。
服务方法中的参数可以是消息,也可以是任意类型。
如果是后者,则假定其为消息负载(message payload),该负载将从消息中提取并注入到服务方法中。
我们通常推荐这种方法,因为它在配合 Spring Integration 工作时遵循并推广了 POJO 模型。
参数也可以带有 @Header 或 @Headers 注解,如 注解支持 中所描述的那样。
| 服务方法不需要任何参数,这意味着您可以实现事件风格的服务激活器(您只关心服务方法的调用),而无需担心消息的内容。 可以将其视为空 JMS 消息。 此类实现的一个示例用例是对输入通道上放置的消息进行简单的计数或监控。 |
从版本 4.1 开始,该框架能够正确地将消息属性(payload 和 headers)转换为 Java 8 的 Optional POJO 方法参数,如下示例所示:
public class MyBean {
public String computeValue(Optional<String> payload,
@Header(value="foo", required=false) String foo1,
@Header(value="foo") Optional<String> foo2) {
if (payload.isPresent()) {
String value = payload.get();
...
}
else {
...
}
}
}
我们通常建议,如果自定义服务激活器处理程序实现可以在其他 <service-activator> 定义中重用,则使用 ref 属性。
但是,如果自定义服务激活器处理程序实现仅在单个 <service-activator> 定义中使用,则可以提供内部 Bean 定义,如下示例所示:
<int:service-activator id="exampleServiceActivator" input-channel="inChannel"
output-channel = "outChannel" method="someMethod">
<beans:bean class="org.something.ExampleServiceActivator"/>
</int:service-activator>
在同一个 <service-activator> 配置中同时使用 ref 属性和内部处理器定义是不允许的,因为这会创建歧义条件并导致抛出异常。 |
如果 ref 属性引用了扩展 AbstractMessageProducingHandler 的 Bean(例如框架本身提供的处理器),则配置将通过直接将输出通道注入处理器来进行优化。
在这种情况下,每个 ref 必须指向单独的 Bean 实例(或 prototype 作用域的 Bean),或者使用内部 <bean/> 配置类型。
如果您无意地从多个 Bean 引用了同一个消息处理器,将会引发配置异常。 |
服务激活器与 Spring 表达式语言 (SpEL)
自 Spring Integration 2.0 起,服务激活器也可以受益于 SpEL。
例如,您可以调用任何 Bean 方法,而无需在 ref 属性中指向该 Bean,也无需将其作为内部 Bean 定义包含在内,如下所示:
<int:service-activator input-channel="in" output-channel="out"
expression="@accountService.processAccount(payload, headers.accountId)"/>
<bean id="accountService" class="thing1.thing2.Account"/>
在之前的配置中,我们不是使用 ref 注入 'accountService' 或将其作为内部 bean,而是使用 SpEL 的 @beanId 表示法,并调用一个接受与消息负载兼容类型的方法。
我们还传递了一个 header 值。
任何有效的 SpEL 表达式都可以针对消息中的任何内容进行求值。
对于简单场景,如果所有逻辑都可以封装在这样的表达式中,您的服务激活器无需引用任何 bean,如下示例所示:
<int:service-activator input-channel="in" output-channel="out" expression="payload * 2"/>
在之前的配置中,我们的服务逻辑是将负载值乘以二。 SpEL 让我们可以相对轻松地处理它。
有关配置服务激活器的更多信息,请参阅 Java DSL 章节中的服务激活器和.handle()方法。
异步服务激活器
服务激活器由调用线程触发。
如果输入通道是 SubscribableChannel,则这是上游线程;如果是 PollableChannel,则是轮询器线程。
如果服务返回 CompletableFuture<?>,默认操作是将其作为负载发送到输出(或回复)通道的消息中。
从 4.3 版本开始,您可以将 async 属性设置为 true(在使用 Java 配置时,请使用 setAsync(true))。
当 async 属性设置为 true 且服务返回 CompletableFuture<?> 时,调用线程会立即释放,回复消息将在完成该 Future 的线程上发送(来自您的服务内部)。
这对于使用 PollableChannel 的长时间运行服务尤其有利,因为轮询器线程会被释放以执行框架内的其他服务。
如果服务以 Exception 完成未来任务,则执行正常的错误处理。
如果存在,ErrorMessage 将被发送到 errorChannel 消息头。
否则,ErrorMessage 将被发送到默认的 errorChannel(如果可用)。
从 6.1 版本开始,如果 AbstractMessageProducingHandler 的输出通道被配置为 ReactiveStreamsSubscribableChannel,则默认启用异步模式。
如果处理器结果不是响应式类型或 CompletableFuture<?>,则无论输出通道类型如何,都会执行常规的消息回复处理过程。
另请参阅 响应式流支持 以获取更多信息。
服务激活器与方法返回类型
服务方法可以返回任何类型,该类型将作为响应消息的有效负载。
在此情况下,会创建一个新的 Message<?> 对象,并将请求消息中的所有头信息复制过来。
对于大多数基于 POJO 方法调用的 Spring Integration MessageHandler 实现,其行为方式相同。
方法也可以返回一个完整的 Message<?> 对象。然而,请记住,与 转换器 不同,对于服务激活器(Service Activator),如果返回的消息中尚不存在请求消息的头部,则将通过复制请求消息的头部来修改该消息。因此,如果您的方法参数是一个 Message<?>,并且您在服务方法中复制了部分而非全部现有请求头,它们将重新出现在响应消息中。从回复消息中移除表头并非服务激活器(Service Activator)的职责,并且遵循松耦合原则,在集成流程中添加一个HeaderFilter是更好的做法。Alternatively, a Transformer can be used instead of a Service Activator but, in that case, when returning a full Message<?> the method is completely responsible for the message, including copying request message headers (if needed).您必须确保重要的框架标头(例如。g.replyChannel, errorChannel),如果存在,必须保留。
延迟器
延迟器是一个简单的端点,它允许消息流被延迟特定的时间间隔。
当消息被延迟时,原始发送者不会被阻塞。
相反,延迟的消息会被调度到一个 org.springframework.scheduling.TaskScheduler 实例,以便在延迟结束后发送到输出通道。
这种方法具有良好的可扩展性,即使对于较长的延迟也是如此,因为它不会导致大量发送线程被阻塞。
相反,在典型情况下,实际执行释放消息的操作会使用线程池。
本节包含几个配置延迟器的示例。
配置延迟器
<delayer>元素用于在两个消息通道之间延迟消息流。
与其他端点一样,您可以提供'input-channel'和'output-channel'属性,但delayer还具有'default-delay'和'expression'属性(以及'expression'元素),用于确定每条消息应延迟的毫秒数。
以下示例将所有消息延迟三秒:
<int:delayer id="delayer" input-channel="input"
default-delay="3000" output-channel="output"/>
如果您需要为每条消息确定延迟,也可以使用 'expression' 属性提供 SpEL 表达式,如下所示:
@Bean
public IntegrationFlow flow() {
return IntegrationFlow.from("input")
.delay("delayer.messageGroupId", d -> d
.defaultDelay(3_000L)
.delayExpression("headers['delay']"))
.channel("output")
.get();
}
@Bean
fun flow() =
integrationFlow("input") {
delay("delayer.messageGroupId") {
defaultDelay(3000L)
delayExpression("headers['delay']")
}
channel("output")
}
@ServiceActivator(inputChannel = "input")
@Bean
public DelayHandler delayer() {
DelayHandler handler = new DelayHandler("delayer.messageGroupId");
handler.setDefaultDelay(3_000L);
handler.setDelayExpressionString("headers['delay']");
handler.setOutputChannelName("output");
return handler;
}
<int:delayer id="delayer" input-channel="input" output-channel="output"
default-delay="3000" expression="headers['delay']"/>
在前面的示例中,三秒的延迟仅适用于给定入站消息的表达式求值为 null 的情况。
如果您只想对表达式求值结果有效的消息应用延迟,可以使用 0(默认值)作为“默认延迟”(default-delay)。
对于任何延迟为 0(或更小)的消息,该消息将立即在调用线程上发送。
XML 解析器使用消息组 ID <beanName>.messageGroupId。 |
延迟处理器支持表示毫秒间隔的表达式求值结果(任何 Object 其 toString() 方法产生的值可解析为 Long),以及表示绝对时间的 java.util.Date 实例。
在第一种情况下,毫秒从当前时间开始计算(例如,5000 的值会使消息在延迟器接收到该消息后至少延迟五秒)。
使用 Date 实例时,消息不会释放,直到达到该 Date 对象所表示的时间。
如果值等同于非正延迟或过去的日期,则不会产生延迟。
相反,它会直接在原始发送者的线程上发送到输出通道。
如果表达式求值结果不是 Date 且无法解析为 Long,则应用默认延迟(如果有——默认值为 0)。 |
表达式求值可能因各种原因抛出求值异常,包括表达式无效或其他条件。
默认情况下,此类异常会被忽略(尽管会在 DEBUG 级别记录日志),且延迟器会回退到默认延迟(如果有的话)。
您可以通过设置 ignore-expression-failures 属性来修改此行为。
默认情况下,该属性设置为 true,延迟器的行为如前所述。
然而,如果您希望不忽略表达式求值异常并将其抛给延迟器的调用者,请将 ignore-expression-failures 属性设置为 false。 |
|
在上面的示例中,延迟表达式被指定为
因此,如果存在表头可能被省略且您希望回退到默认延迟的情况,通常使用索引器语法而不是点属性访问器语法更为高效(且被推荐),因为检测 null 比捕获异常更快。 |
延迟器委托给 Spring 的 TaskScheduler 抽象的一个实例。
延迟器默认使用的调度器是 Spring Integration 在启动时提供的 ThreadPoolTaskScheduler 实例。
请参阅 配置任务调度器。
如果您希望委托给不同的调度器,可以通过延迟器元素的'scheduler'属性提供一个引用,如下例所示:
<int:delayer id="delayer" input-channel="input" output-channel="output"
expression="headers.delay"
scheduler="exampleTaskScheduler"/>
<task:scheduler id="exampleTaskScheduler" pool-size="3"/>
如果您配置了外部 ThreadPoolTaskScheduler,则可以在此属性上设置 waitForTasksToCompleteOnShutdown = true。
它允许在应用程序关闭时成功完成处于执行状态的“延迟”任务(释放消息)。
在 Spring Integration 2.2 之前,此属性可用於 <delayer> 元素,因为 DelayHandler 可以在后台创建自己的调度器。
自 2.2 版本起,延迟器需要外部调度器实例,waitForTasksToCompleteOnShutdown 已被删除。
您应使用调度器自身的配置。 |
ThreadPoolTaskScheduler 拥有一个属性 errorHandler,该属性可以注入 org.springframework.util.ErrorHandler 的某个实现。
此处理器允许从发送延迟消息的计划任务线程中处理 Exception。
默认情况下,它使用一个 org.springframework.scheduling.support.TaskUtils$LoggingErrorHandler,您可以在日志中看到堆栈跟踪信息。
您可能需要考虑使用 org.springframework.integration.channel.MessagePublishingErrorHandler,它将 ErrorMessage 发送到 error-channel(无论是来自失败消息的头部还是默认的 error-channel)。
此错误处理在事务回滚后执行(如果存在事务)。
请参阅 释放故障。 |
延迟器和消息存储
The DelayHandler 将延迟消息持久化到提供的 MessageStore 中的消息组中。
('groupId' 基于 <delayer> 元素的必需 'id' 属性。)
延迟消息会在 DelayHandler 向 output-channel 发送消息之前,由调度任务立即从 MessageStore 中移除。
如果提供的 MessageStore 是持久化的(例如 JdbcMessageStore),则可以在应用关闭时避免丢失消息。
应用启动后,DelayHandler 会从其在 MessageStore 中的消息组读取消息,并根据消息的原始到达时间(如果延迟为数值)重新调度延迟。
对于延迟头为 Date 的消息,在重新调度时将使用该 Date。
如果延迟消息在 MessageStore 中停留的时间超过其 'delay' 值,则在启动后立即发送该消息。
<delayer> 可以通过两个互斥元素中的任意一个进行增强:<transactional> 和 <advice-chain>。
这些 AOP 通知的 List 被应用于代理的内部 DelayHandler.ReleaseMessageHandler,该组件负责在延迟后于调度任务的 Thread 上释放消息。
例如,当下游消息流抛出异常且 ReleaseMessageHandler 的事务回滚时,可能会使用此功能。
在这种情况下,延迟的消息将保留在持久化的 MessageStore 中。
您可以在 <advice-chain> 内使用任何自定义的 org.aopalliance.aop.Advice 实现。
<transactional> 元素定义了一个仅包含事务性通知的简单通知链。
以下示例展示了 <delayer> 内的 advice-chain:
<int:delayer id="delayer" input-channel="input" output-channel="output"
expression="headers.delay"
message-store="jdbcMessageStore">
<int:advice-chain>
<beans:ref bean="customAdviceBean"/>
<tx:advice>
<tx:attributes>
<tx:method name="*" read-only="true"/>
</tx:attributes>
</tx:advice>
</int:advice-chain>
</int:delayer>
DelayHandler可以导出为带有管理操作(getDelayedMessageCount和reschedulePersistedMessages)的JMX MBean,从而允许在运行时重新调度延迟持久化的消息——例如,如果之前已停止TaskScheduler。
这些操作可以通过Control Bus命令调用,如下示例所示:
Message<String> delayerReschedulingMessage =
MessageBuilder.withPayload("@'delayer.handler'.reschedulePersistedMessages()").build();
controlBusChannel.send(delayerReschedulingMessage);
| 有关消息存储、JMX和控制总线的更多信息,请参阅系统管理。 |
从版本 5.3.7 开始,如果将消息存储到 MessageStore 时存在活动事务,则释放任务将在 TransactionSynchronization.afterCommit() 回调中调度。
这是为了防止竞态条件,即调度的释放操作可能在事务提交之前运行,导致找不到该消息。
在这种情况下,消息将在延迟后或事务提交后(以较晚者为准)被释放。
发布失败
从版本 5.0.8 开始,延迟器(delayer)新增了两个属性:
-
maxAttempts(默认值为 5) -
retryDelay(默认 1 秒)
当消息被释放时,如果下游流失败,将在 retryDelay 后尝试重新释放。
如果达到 maxAttempts,消息将被丢弃(除非释放是事务性的,在这种情况下,消息将保留在存储中,但不再计划释放,直到应用程序重新启动或调用上述的 reschedulePersistedMessages() 方法)。
此外,您可以配置一个 delayedMessageErrorChannel;当发布失败时,会向该通道发送一个 ErrorMessage,其中异常作为负载,并具有 originalMessage 属性。
ErrorMessage 包含一个标头 IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT,其中包含当前计数。
如果错误流消费了错误消息并正常退出,则不再采取进一步操作;如果发布是事务性的,则事务将提交,并且消息将从存储中删除。
如果错误流抛出异常,则发布将根据上述讨论最多重试 maxAttempts 次。
脚本支持
Spring Integration 2.1 增加了对 JSR223 Java 脚本规范 的支持,该规范在 Java 6 版本中引入。 它允许您使用任何受支持语言(包括 Ruby、JRuby、Groovy 和 Kotlin)编写的脚本来为各种集成组件提供逻辑,其方式类似于 Spring Expression Language (SpEL) 在 Spring Integration 中的使用方式。 有关 JSR223 的更多信息,请参阅 文档。
您需要将以下依赖项包含到您的项目中:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-scripting</artifactId>
<version>6.1.9</version>
</dependency>
compile "org.springframework.integration:spring-integration-scripting:6.1.9"
此外,您还需要添加一个脚本引擎实现,例如 JRuby、Jython。
从 5.2 版本开始,Spring Integration 提供了对 Kotlin Jsr223 的支持。 您需要将以下依赖项添加到项目中以使其生效:
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-scripting-jsr223</artifactId>
<scope>runtime</scope>
</dependency>
runtime 'org.jetbrains.kotlin:kotlin-scripting-jsr223'
| 各种 JSR223 语言实现已由第三方开发。 特定实现与 Spring Integration 的兼容性取决于其符合规范的完善程度以及实施者对规范的理解。 |
| 如果您计划使用 Groovy 作为您的脚本语言,我们推荐使用 Spring-Integration 的 Groovy 支持,因为它提供了针对 Groovy 的额外功能。 不过,本节内容也同样相关。 |
脚本配置
根据您的集成需求的复杂程度,脚本可以以 XML 配置中的 CDATA 形式内联提供,或者作为包含该脚本的 Spring 资源的引用。
要启用脚本支持,Spring Integration 定义了一个 ScriptExecutingMessageProcessor,它将消息负载绑定到名为 payload 的变量,并将消息头绑定到 headers 变量,两者均可在脚本执行上下文中访问。
您只需编写使用这些变量的脚本即可。
以下示例对展示了创建过滤器的示例配置:
@Bean
public IntegrationFlow scriptFilter() {
return f -> f.filter(Scripts.processor("some/path/to/ruby/script/RubyFilterTests.rb"));
}
...
@Bean
public Resource scriptResource() {
return new ByteArrayResource("headers.type == 'good'".getBytes());
}
@Bean
public IntegrationFlow scriptFilter() {
return f -> f.filter(Scripts.processor(scriptResource()).lang("groovy"));
}
<int:filter input-channel="referencedScriptInput">
<int-script:script location="some/path/to/ruby/script/RubyFilterTests.rb"/>
</int:filter>
<int:filter input-channel="inlineScriptInput">
<int-script:script lang="groovy">
<![CDATA[
return payload == 'good'
]]>
</int-script:script>
</int:filter>
如前面的示例所示,脚本可以内联包含,也可以通过引用资源位置来包含(使用 location 属性)。
此外,lang 属性对应于语言名称(或其 JSR223 别名)。
其他支持脚本的 Spring Integration 端点元素包括 router、service-activator、transformer 和 splitter。
每种情况下的脚本配置均与上述内容相同(除端点元素外)。
脚本支持的另一个有用功能是能够在不重启应用上下文的情况下更新(重新加载)脚本。
为此,请在 script 元素上指定 refresh-check-delay 属性,如下示例所示:
Scripts.processor(...).refreshCheckDelay(5000)
}
<int-script:script location="..." refresh-check-delay="5000"/>
在上面的示例中,脚本位置每 5 秒检查一次更新。 如果脚本已更新,则在更新后超过 5 秒发生的任何调用将导致运行新脚本。
考虑以下示例:
Scripts.processor(...).refreshCheckDelay(0)
}
<int-script:script location="..." refresh-check-delay="0"/>
在前面的示例中,上下文会在发生任何脚本修改时立即更新,从而提供一种简单的“实时”配置机制。 任何负值都表示在应用上下文初始化后不会重新加载脚本。 这是默认行为。 以下示例展示了一个永不更新的脚本:
Scripts.processor(...).refreshCheckDelay(-1)
}
<int-script:script location="..." refresh-check-delay="-1"/>
| 内联脚本无法重新加载。 |
脚本变量绑定
需要变量绑定,以使脚本能够引用外部提供给脚本执行上下文的变量。
默认情况下,payload 和 headers 被用作绑定变量。
您可以使用 <variable> 元素(或 ScriptSpec.variables() 选项)将额外的变量绑定到脚本中,如下示例所示:
Scripts.processor("foo/bar/MyScript.py")
.variables(Map.of("var1", "thing1", "var2", "thing2", "date", date))
}
<script:script lang="py" location="foo/bar/MyScript.py">
<script:variable name="var1" value="thing1"/>
<script:variable name="var2" value="thing2"/>
<script:variable name="date" ref="date"/>
</script:script>
如前例所示,您可以将脚本变量绑定到标量值或 Spring Bean 引用。
请注意,payload和headers仍作为绑定变量包含在内。
随着 Spring Integration 3.0 的发布,除了 variable 元素外,还引入了 variables 属性。
该属性与 variable 元素并非互斥,您可以在同一个 script 组件中组合使用它们。
然而,无论变量在何处定义,其名称必须唯一。
此外,自 Spring Integration 3.0 起,内联脚本也允许进行变量绑定,如下示例所示:
<service-activator input-channel="input">
<script:script lang="ruby" variables="thing1=THING1, date-ref=dateBean">
<script:variable name="thing2" ref="thing2Bean"/>
<script:variable name="thing3" value="thing2"/>
<![CDATA[
payload.foo = thing1
payload.date = date
payload.bar = thing2
payload.baz = thing3
payload
]]>
</script:script>
</service-activator>
前面的示例展示了内联脚本、variable元素和variables属性的组合。
variables属性包含逗号分隔的值,其中每个段包含一个由'='分隔的变量及其值的对。
变量名可以后缀-ref,如前面示例中的date-ref变量所示。
这意味着绑定变量的名称为date,但其值是应用程序上下文中dateBeanbean的引用。
在使用属性占位符配置或命令行参数时,这可能非常有用。
如果您需要更精细地控制变量的生成方式,您可以实现自己的 Java 类,该类使用 ScriptVariableGenerator 策略,该策略由以下接口定义:
public interface ScriptVariableGenerator {
Map<String, Object> generateScriptVariables(Message<?> message);
}
此接口要求您实现 generateScriptVariables(Message) 方法。
消息参数允许您访问消息负载和标头中可用的任何数据,返回值是绑定变量的 Map。
每次为消息执行脚本时都会调用此方法。
以下示例展示了如何实现 ScriptVariableGenerator 并使用 script-variable-generator 属性引用它:
Scripts.processor("foo/bar/MyScript.groovy")
.variableGenerator(new foo.bar.MyScriptVariableGenerator())
}
<int-script:script location="foo/bar/MyScript.groovy"
script-variable-generator="variableGenerator"/>
<bean id="variableGenerator" class="foo.bar.MyScriptVariableGenerator"/>
如果未提供 script-variable-generator,脚本组件将使用 DefaultScriptVariableGenerator,该值会将任何提供的 <variable> 元素与来自 Message 的 payload 和 headers 变量在其 generateScriptVariables(Message) 方法中进行合并。
您不能同时提供 script-variable-generator 属性和 <variable> 元素。它们是互斥的。 |
GraalVM 多语言支持
从版本 6.0 开始,该框架提供了一个基于 GraalVM 多语言 API 的 PolyglotScriptExecutor。
原本由 Java 自身移除的 JavaScript 的 JSR223 引擎实现,现已通过此新的脚本执行器进行替换。
有关在 GraalVM 中启用 JavaScript 支持的更多信息,以及可以通过脚本变量传播哪些 配置选项,请参见相关文档。
默认情况下,该框架将共享 Polyglot Context 上的 allowAllAccess 设置为 true,从而启用与主机 JVM 的这种交互:
-
新线程的创建和使用。
-
访问公共主机类。
-
通过向类路径添加条目来加载新的主机类。
-
正在将新成员导出到多语言绑定中。
-
主机系统上的不受限制的 IO 操作。
-
传递实验性选项。
-
新子进程的产生与使用。
-
访问进程环境变量。
这可以通过重载的 PolyglotScriptExecutor 构造函数进行自定义,该构造函数接受一个 org.graalvm.polyglot.Context.Builder。
To enable this JavaScript support, GraalVM with the js component installed has to be used or, when using a regular JVM, the org.graalvm.sdk:graal-sdk and org.graalvm.js:js dependencies must be included.
Groovy 支持
在 Spring Integration 2.0 中,我们新增了 Groovy 支持,允许您使用 Groovy 脚本语言为各种集成组件提供逻辑——类似于 Spring 表达式语言(SpEL)在路由、转换和其他集成关注点上的支持方式。 有关 Groovy 的更多信息,请参阅 Groovy 文档,您可以在项目网站上找到它。
您需要将以下依赖项包含到您的项目中:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-groovy</artifactId>
<version>6.1.9</version>
</dependency>
compile "org.springframework.integration:spring-integration-groovy:6.1.9"
此外,从 6.0 版本开始,提供了用于集成流配置的 Groovy DSL。
Groovy 配置
随着 Spring Integration 2.1 的发布,Groovy 支持的配置命名空间是 Spring Integration 脚本支持的一个扩展,并共享在 脚本支持 章节中详细描述的核心配置和行为。
尽管 Groovy 脚本得到了通用脚本支持的充分支持,但 Groovy 支持提供了由 Spring Framework 的 org.springframework.scripting.groovy.GroovyScriptFactory 及相关组件支持的 Groovy 配置命名空间,为使用 Groovy 提供了更强大的功能。
以下示例展示了两个配置样例:
<int:filter input-channel="referencedScriptInput">
<int-groovy:script location="some/path/to/groovy/file/GroovyFilterTests.groovy"/>
</int:filter>
<int:filter input-channel="inlineScriptInput">
<int-groovy:script><![CDATA[
return payload == 'good'
]]></int-groovy:script>
</int:filter>
如前面的示例所示,配置看起来与通用脚本支持配置相同。
唯一的区别是使用了 Groovy 命名空间,如 int-groovy 命名空间前缀所示。
另外请注意,<script> 标签上的 lang 属性在此命名空间中无效。
Groovy 对象自定义
如果您需要自定义 Groovy 对象本身(而不仅仅是设置变量),可以通过使用 GroovyObjectCustomizer 属性引用实现了 customizer 的 bean。
例如,如果您想通过修改 MetaClass 并注册在脚本中可用的函数来实现领域特定语言(DSL),这可能非常有用。
以下示例展示了如何实现这一点:
<int:service-activator input-channel="groovyChannel">
<int-groovy:script location="somewhere/SomeScript.groovy" customizer="groovyCustomizer"/>
</int:service-activator>
<beans:bean id="groovyCustomizer" class="org.something.MyGroovyObjectCustomizer"/>
设置自定义 GroovyObjectCustomizer 与 <variable> 元素或 script-variable-generator 属性并不互斥。
在定义内联脚本时也可以提供它。
Spring Integration 3.0 引入了 variables 属性,该属性与 variable 元素配合使用。
此外,如果绑定变量未提供名称,groovy 脚本能够将变量解析为 BeanFactory 中的 Bean。
以下示例展示了如何使用变量(entityManager):
<int-groovy:script>
<![CDATA[
entityManager.persist(payload)
payload
]]>
</int-groovy:script>
entityManager 必须是应用程序上下文中的适当 Bean。
有关 <variable> 元素、variables 属性和 script-variable-generator 属性的更多信息,请参见 脚本变量绑定。
Groovy 脚本编译器自定义
@CompileStatic 提示是最受欢迎的 Groovy 编译器自定义选项。
它可以用于类或方法级别。
有关更多信息,请参阅 Groovy 参考手册,特别是 @CompileStatic。
为了在集成场景中对简短脚本利用此功能,我们被迫将简单的脚本更改为更像 Java 的代码。
考虑以下 <filter> 脚本:
headers.type == 'good'
上述脚本在 Spring Integration 中变为以下方法:
@groovy.transform.CompileStatic
String filter(Map headers) {
headers.type == 'good'
}
filter(headers)
通过这种方式,filter() 方法被转换并编译为静态 Java 代码,绕过了 Groovy 动态调用阶段,例如 getProperty() 工厂和 CallSite 代理。
从 4.3 版本开始,您可以使用 compile-static boolean 选项配置 Spring Integration Groovy 组件,指定为 ASTTransformationCustomizer 添加 @CompileStatic 到内部 CompilerConfiguration。
在此基础上,您可以在脚本代码中省略带有 @CompileStatic 的方法声明,同时仍能获得编译后的纯 Java 代码。
在这种情况下,上述脚本可以简短,但仍需比解释型脚本稍显冗长,如下示例所示:
binding.variables.headers.type == 'good'
您必须通过 groovy.lang.Script binding 属性访问 headers 和 payload(或任何其他)变量,因为使用 @CompileStatic 时,我们不具备动态 GroovyObject.getProperty() 功能。
此外,我们引入了 compiler-configuration Bean 引用。
通过此属性,您可以提供任何所需的其他 Groovy 编译器自定义配置,例如 ImportCustomizer。
有关此功能的更多信息,请参阅 Groovy 文档中的 高级编译器配置。
使用 compilerConfiguration 不会自动为 @CompileStatic 注解添加 ASTTransformationCustomizer,并且它会覆盖 compileStatic 选项。
如果您仍然需要 CompileStatic,则应手动将该自定义 compilerConfiguration 的 CompilationCustomizers 中添加一个 new ASTTransformationCustomizer(CompileStatic.class)。 |
Groovy 编译器自定义设置对 refresh-check-delay 选项没有任何影响,可重新加载的脚本也可以进行静态编译。 |
控制总线
正如 (企业集成模式) 中所描述,控制总线的核心思想是:您可以使用与用于框架内组件的“应用级”消息传递相同的消息系统来进行监控和管理。 在 Spring Integration 中,我们基于前述适配器进行构建,以便您可以通过发送消息来调用暴露的操作。 这些操作的一种选择是使用 Groovy 脚本。 以下示例配置了用于控制总线的 Groovy 脚本:
<int-groovy:control-bus input-channel="operationChannel"/>
控制总线有一个输入通道,可以通过该通道调用应用程序上下文中 bean 的操作。
Groovy 控制总线在输入通道上运行消息,将其作为 Groovy 脚本执行。
它接收一条消息,将主体编译为脚本,使用 GroovyObjectCustomizer 对其进行自定义,然后运行该脚本。
控制总线的 MessageProcessor 暴露了应用上下文中所有带有 @ManagedResource 注解并实现了 Spring 的 Lifecycle 接口或扩展了 Spring 的 CustomizableThreadCreator 基类的 Bean(例如,几个 TaskExecutor 和 TaskScheduler 的实现)。
在使用控制总线(Control Bus)的命令脚本时,请谨慎使用具有自定义作用域(如 'request')的托管 Bean,尤其是在异步消息流内部。
如果控制总线的 MessageProcessor 无法从应用上下文中暴露某个 Bean,则在命令脚本执行期间可能会出现某些 BeansException。
例如,如果自定义作用域的上下文尚未建立,则尝试在该作用域内获取 Bean 将触发 BeanCreationException。 |
如果您需要进一步自定义 Groovy 对象,还可以通过 customizer 属性提供一个实现了 GroovyObjectCustomizer 的 Bean 引用,如下例所示:
<int-groovy:control-bus input-channel="input"
output-channel="output"
customizer="groovyCustomizer"/>
<beans:bean id="groovyCustomizer" class="org.foo.MyGroovyObjectCustomizer"/>
为端点添加行为
在 Spring Integration 2.2 之前,您可以通过向轮询器的 <advice-chain/> 元素添加 AOP Advice,为整个集成流添加行为。
然而,假设您只想重试某个 REST Web Service 调用,而不重试任何下游端点。
例如,考虑以下流程:
inbound-adapter->poller->http-gateway1->http-gateway2->jdbc-outbound-adapter
如果您在轮询器的建议链中配置了一些重试逻辑,并且对 http-gateway2 的调用因网络故障而失败,则重试会导致 http-gateway1 和 http-gateway2 被再次调用。
类似地,在 jdbc-outbound-adapter 发生暂时性故障后,两个 HTTP 网关都会被再次调用,然后才会再次调用 jdbc-outbound-adapter。
Spring Integration 2.2 新增了对各个端点添加行为的能力。
这是通过向许多端点添加 <request-handler-advice-chain/> 元素来实现的。
以下示例展示了如何在 outbound-gateway 中使用 <request-handler-advice-chain/> 元素:
<int-http:outbound-gateway id="withAdvice"
url-expression="'http://localhost/test1'"
request-channel="requests"
reply-channel="nextChannel">
<int-http:request-handler-advice-chain>
<ref bean="myRetryAdvice" />
</int-http:request-handler-advice-chain>
</int-http:outbound-gateway>
在这种情况下,myRetryAdvice 仅本地应用于此网关,且在回复发送到 nextChannel 之后,不适用于下游采取的进一步操作。
建议的作用范围仅限于端点本身。
|
目前,您无法为整个 然而,可以在 |
提供的通知类
除了提供应用 AOP 通知类的通用机制外,Spring Integration 还提供了这些开箱即用的通知实现:
-
RequestHandlerRetryAdvice(在 重试建议 中描述) -
RequestHandlerCircuitBreakerAdvice(在 熔断器建议 中描述) -
ExpressionEvaluatingRequestHandlerAdvice(在 表达式求值通知 中描述) -
RateLimiterRequestHandlerAdvice(在 限流器建议 中描述) -
CacheRequestHandlerAdvice(在 缓存建议 中描述) -
ReactiveRequestHandlerAdvice(在 响应式建议 中描述) -
ContextHolderRequestHandlerAdvice(在 上下文持有者建议 中描述)
重试建议
重试建议 (o.s.i.handler.advice.RequestHandlerRetryAdvice) 利用了 Spring Retry 项目提供的丰富重试机制。
spring-retry 的核心组件是 RetryTemplate,它允许配置复杂的重试场景,包括 RetryPolicy 和 BackoffPolicy 策略(具有多种实现),以及一个 RecoveryCallback 策略,用于确定在重试耗尽时应采取的操作。
- 无状态重试
-
无状态重试是指重试活动完全在通知(advice)内部处理的情况。 线程会暂停(如果配置为如此),然后重试该操作。
- 有状态重试
-
状态重试是指重试状态在通知中管理,但抛出异常且调用方重新提交请求的情况。 状态重试的一个例子是,我们希望消息发起者(例如 JMS)负责重新提交,而不是在当前线程上执行。 状态重试需要某种机制来检测已重试的提交。
有关spring-retry的更多信息,请参阅该项目的 Javadoc以及Spring Batch的参考文档,其中spring-retry即源自于此。
| 默认的回退行为是不进行回退。 重试会立即尝试。 使用导致线程在尝试之间暂停的回退策略可能会引发性能问题,包括内存过度使用和线程饥饿。 在高流量环境中,应谨慎使用回退策略。 |
配置重试建议
本节中的示例使用了以下 <service-activator>,该对象始终抛出异常:
public class FailingService {
public void service(String message) {
throw new RuntimeException("error");
}
}
- 无状态重试
-
默认的
RetryTemplate具有一个SimpleRetryPolicy,它会尝试三次。 没有BackOffPolicy,因此这三次尝试会连续进行,且每次尝试之间没有延迟。 没有RecoveryCallback,因此在最后一次重试失败后,会将异常抛给调用方。 在 Spring Integration 环境中,可以使用入站端点上的error-channel来处理此最终异常。 以下示例使用了RetryTemplate并展示了其DEBUG输出:<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"/> </int:request-handler-advice-chain> </int:service-activator> DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...] DEBUG [task-scheduler-2]Retry: count=0 DEBUG [task-scheduler-2]Checking for rethrow: count=1 DEBUG [task-scheduler-2]Retry: count=1 DEBUG [task-scheduler-2]Checking for rethrow: count=2 DEBUG [task-scheduler-2]Retry: count=2 DEBUG [task-scheduler-2]Checking for rethrow: count=3 DEBUG [task-scheduler-2]Retry failed last attempt: count=3 - 带有恢复功能的简单无状态重试
-
以下示例在前一个示例的基础上添加了
RecoveryCallback,并使用ErrorMessageSendingRecoverer将ErrorMessage发送到通道:<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"> <property name="recoveryCallback"> <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer"> <constructor-arg ref="myErrorChannel" /> </bean> </property> </bean> </int:request-handler-advice-chain> </int:service-activator> DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...] DEBUG [task-scheduler-2]Retry: count=0 DEBUG [task-scheduler-2]Checking for rethrow: count=1 DEBUG [task-scheduler-2]Retry: count=1 DEBUG [task-scheduler-2]Checking for rethrow: count=2 DEBUG [task-scheduler-2]Retry: count=2 DEBUG [task-scheduler-2]Checking for rethrow: count=3 DEBUG [task-scheduler-2]Retry failed last attempt: count=3 DEBUG [task-scheduler-2]Sending ErrorMessage :failedMessage:[Payload=...] - 带有自定义策略的无状态重试与恢复
-
为了更高级的用法,我们可以提供带有自定义
RetryTemplate的建议。 本示例继续使用SimpleRetryPolicy,但将重试次数增加到四次。 它还添加了一个ExponentialBackoffPolicy,其中第一次重试等待一秒,第二次等待五秒,第三次等待 25 秒(总共四次重试)。 以下代码示例展示了该示例及其DEBUG输出:<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"> <property name="recoveryCallback"> <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer"> <constructor-arg ref="myErrorChannel" /> </bean> </property> <property name="retryTemplate" ref="retryTemplate" /> </bean> </int:request-handler-advice-chain> </int:service-activator> <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate"> <property name="retryPolicy"> <bean class="org.springframework.retry.policy.SimpleRetryPolicy"> <property name="maxAttempts" value="4" /> </bean> </property> <property name="backOffPolicy"> <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy"> <property name="initialInterval" value="1000" /> <property name="multiplier" value="5.0" /> <property name="maxInterval" value="60000" /> </bean> </property> </bean> 27.058 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...] 27.071 DEBUG [task-scheduler-1]Retry: count=0 27.080 DEBUG [task-scheduler-1]Sleeping for 1000 28.081 DEBUG [task-scheduler-1]Checking for rethrow: count=1 28.081 DEBUG [task-scheduler-1]Retry: count=1 28.081 DEBUG [task-scheduler-1]Sleeping for 5000 33.082 DEBUG [task-scheduler-1]Checking for rethrow: count=2 33.082 DEBUG [task-scheduler-1]Retry: count=2 33.083 DEBUG [task-scheduler-1]Sleeping for 25000 58.083 DEBUG [task-scheduler-1]Checking for rethrow: count=3 58.083 DEBUG [task-scheduler-1]Retry: count=3 58.084 DEBUG [task-scheduler-1]Checking for rethrow: count=4 58.084 DEBUG [task-scheduler-1]Retry failed last attempt: count=4 58.086 DEBUG [task-scheduler-1]Sending ErrorMessage :failedMessage:[Payload=...] - 无状态重试的命名空间支持
-
从 4.0 版本开始,得益于对重试通知的命名空间支持,上述配置可以大大简化,如下例所示:
<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <ref bean="retrier" /> </int:request-handler-advice-chain> </int:service-activator> <int:handler-retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel"> <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" /> </int:handler-retry-advice>在前面的示例中,通知被定义为顶层 Bean,因此它可以在多个
request-handler-advice-chain实例中使用。 您也可以在链中直接定义通知,如下例所示:<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <int:retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel"> <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" /> </int:retry-advice> </int:request-handler-advice-chain> </int:service-activator>一个
<handler-retry-advice>可以拥有一个<fixed-back-off>或<exponential-back-off>子元素,也可以没有子元素。 一个没有子元素的<handler-retry-advice>不使用退避策略。 如果没有recovery-channel,则在重试耗尽时抛出异常。 该命名空间仅可用于无状态重试。对于更复杂的环境(自定义策略等),请使用标准的
<bean>定义。 - 带有恢复功能的简单有状态重试
-
为了使重试状态化,我们需要为建议提供一个
RetryStateGenerator实现。 此类用于将消息标识为重试提交,以便RetryTemplate能够确定该消息的重试当前状态。 框架提供了一个SpelExpressionRetryStateGenerator,它通过使用 SpEL 表达式来确定消息标识符。 本示例再次使用默认策略(三次尝试且无退避)。 与无状态重试一样,这些策略也可以进行自定义。 以下代码块展示了示例及其DEBUG输出:<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"> <property name="retryStateGenerator"> <bean class="o.s.i.handler.advice.SpelExpressionRetryStateGenerator"> <constructor-arg value="headers['jms_messageId']" /> </bean> </property> <property name="recoveryCallback"> <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer"> <constructor-arg ref="myErrorChannel" /> </bean> </property> </bean> </int:request-handler-advice-chain> </int:service-activator> 24.351 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 24.368 DEBUG [Container#0-1]Retry: count=0 24.387 DEBUG [Container#0-1]Checking for rethrow: count=1 24.387 DEBUG [Container#0-1]Rethrow in retry for policy: count=1 24.387 WARN [Container#0-1]failure occurred in gateway sendAndReceive org.springframework.integration.MessagingException: Failed to invoke handler ... Caused by: java.lang.RuntimeException: foo ... 24.391 DEBUG [Container#0-1]Initiating transaction rollback on application exception ... 25.412 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 25.412 DEBUG [Container#0-1]Retry: count=1 25.413 DEBUG [Container#0-1]Checking for rethrow: count=2 25.413 DEBUG [Container#0-1]Rethrow in retry for policy: count=2 25.413 WARN [Container#0-1]failure occurred in gateway sendAndReceive org.springframework.integration.MessagingException: Failed to invoke handler ... Caused by: java.lang.RuntimeException: foo ... 25.414 DEBUG [Container#0-1]Initiating transaction rollback on application exception ... 26.418 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 26.418 DEBUG [Container#0-1]Retry: count=2 26.419 DEBUG [Container#0-1]Checking for rethrow: count=3 26.419 DEBUG [Container#0-1]Rethrow in retry for policy: count=3 26.419 WARN [Container#0-1]failure occurred in gateway sendAndReceive org.springframework.integration.MessagingException: Failed to invoke handler ... Caused by: java.lang.RuntimeException: foo ... 26.420 DEBUG [Container#0-1]Initiating transaction rollback on application exception ... 27.425 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 27.426 DEBUG [Container#0-1]Retry failed last attempt: count=3 27.426 DEBUG [Container#0-1]Sending ErrorMessage :failedMessage:[Payload=...]如果您将前面的示例与无状态示例进行比较,您会发现,在使用有状态重试时,每次失败都会向调用者抛出异常。
- 重试的异常分类
-
Spring Retry 在确定哪些异常可以触发重试方面具有极大的灵活性。 默认配置会对所有异常进行重试,且异常分类器会检查顶层异常。 如果您将其配置为仅在
MyException时重试,而您的应用程序抛出了SomeOtherException(其原因是MyException),则不会发生重试。自 Spring Retry 1.0.3 起,
BinaryExceptionClassifier拥有一个名为traverseCauses的属性(默认值为false)。 当启用true时,它将遍历异常原因,直到找到匹配项或遍历完所有异常原因。要使用此分类器进行重试,请使用通过接受最大尝试次数、
Map个Exception对象以及traverseCauses布尔值的构造函数创建的SimpleRetryPolicy。 然后将此策略注入到RetryTemplate中。
traverseCauses 在这种情况下是必需的,因为用户异常可能会被包装在 MessagingException 中。 |
断路器建议
断路器模式的基本思想是,如果某个服务当前不可用,就不要浪费时间和资源去尝试使用它。
o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice 实现了该模式。
当断路器处于关闭状态时,端点会尝试调用服务。
如果连续多次调用失败,断路器将进入打开状态。
当处于打开状态时,新请求会“快速失败”,在一段时间过期之前不会尝试调用服务。
当该时间过期后,断路器将被设置为半开状态。 处于此状态时,如果即使单次尝试失败,断路器会立即进入打开状态。 如果尝试成功,断路器将回到关闭状态;在这种情况下,除非再次发生配置数量的连续失败,否则不会再次进入打开状态。 任何成功的尝试都会将失败计数重置为零,以便确定断路器何时可能再次进入打开状态。
通常,此类建议可用于外部服务,其中可能需要一段时间才会失败(例如尝试建立网络连接时出现超时)。
RequestHandlerCircuitBreakerAdvice 有两个属性:threshold 和 halfOpenAfter。
threshold 属性表示在断路器打开之前需要发生的连续失败次数。
其默认值为 5。
halfOpenAfter 属性表示断路器在最后一次失败后等待尝试下一次请求的时间。
默认为 1000 毫秒。
以下示例配置了一个熔断器,并展示了其 DEBUG 和 ERROR 输出:
<int:service-activator input-channel="input" ref="failer" method="service">
<int:request-handler-advice-chain>
<bean class="o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice">
<property name="threshold" value="2" />
<property name="halfOpenAfter" value="12000" />
</bean>
</int:request-handler-advice-chain>
</int:service-activator>
05.617 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...]
05.638 ERROR [task-scheduler-1]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
10.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
10.600 ERROR [task-scheduler-2]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
15.598 DEBUG [task-scheduler-3]preSend on channel 'input', message: [Payload=...]
15.599 ERROR [task-scheduler-3]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
20.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
20.598 ERROR [task-scheduler-2]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
25.598 DEBUG [task-scheduler-5]preSend on channel 'input', message: [Payload=...]
25.601 ERROR [task-scheduler-5]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
30.598 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=foo...]
30.599 ERROR [task-scheduler-1]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
在前面的示例中,阈值设置为 2,halfOpenAfter 设置为 12 秒。
每 5 秒到达一个新请求。
前两次尝试调用了服务。
第三和第四次尝试失败,并抛出异常,表明断路器已打开。
第五次请求被尝试,因为该请求距离上次失败已过去 15 秒。
第六次尝试立即失败,因为断路器立即进入打开状态。
表达式求值通知
最终提供的通知类是 o.s.i.handler.advice.ExpressionEvaluatingRequestHandlerAdvice。
此通知比其他两个通知更为通用。
它提供了一种机制,用于评估发送到端点的原始传入消息中的表达式。
分别提供了在成功或失败后评估的表达式。
可选地,包含评估结果和输入消息的消息可以被发送到消息通道。
此通知的一个典型用例可能与<ftp:outbound-channel-adapter/>相关,例如:如果传输成功则将文件移动到某个目录,或者如果失败则移动到另一个目录:
该建议具有属性,用于在成功时设置表达式、在失败时设置表达式,并为每种情况提供相应的通道。
对于成功的情况,发送到 successChannel 的消息是一个 AdviceMessage,其负载为表达式评估的结果。
另一个名为 inputMessage 的属性包含发送给处理器的原始消息。
当处理器抛出异常时,发送到 failureChannel 的消息是一个 ErrorMessage,其负载为 MessageHandlingExpressionEvaluatingAdviceException。
与所有 MessagingException 实例一样,此负载具有 failedMessage 和 cause 属性,此外还有一个名为 evaluationResult 的额外属性,其中包含表达式评估的结果。
从版本 5.1.3 开始,如果配置了通道但未提供表达式,则使用默认表达式对消息的 payload 进行求值。 |
当在通知的作用域内抛出异常时,默认情况下,在任何 failureExpression 被求值后,该异常会被抛给调用者。
如果您希望抑制异常的抛出,请将 trapException 属性设置为 true。
以下示例展示了如何使用 Java DSL 配置 advice:
@SpringBootApplication
public class EerhaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(EerhaApplication.class, args);
MessageChannel in = context.getBean("advised.input", MessageChannel.class);
in.send(new GenericMessage<>("good"));
in.send(new GenericMessage<>("bad"));
context.close();
}
@Bean
public IntegrationFlow advised() {
return f -> f.<String>handle((payload, headers) -> {
if (payload.equals("good")) {
return null;
}
else {
throw new RuntimeException("some failure");
}
}, c -> c.advice(expressionAdvice()));
}
@Bean
public Advice expressionAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setSuccessChannelName("success.input");
advice.setOnSuccessExpressionString("payload + ' was successful'");
advice.setFailureChannelName("failure.input");
advice.setOnFailureExpressionString(
"payload + ' was bad, with reason: ' + #exception.cause.message");
advice.setTrapException(true);
return advice;
}
@Bean
public IntegrationFlow success() {
return f -> f.handle(System.out::println);
}
@Bean
public IntegrationFlow failure() {
return f -> f.handle(System.out::println);
}
}
速率限制器建议
速率限制器建议(RateLimiterRequestHandlerAdvice)可确保端点不会因请求过多而过载。
当突破速率限制时,请求将进入阻塞状态。
此类建议的一个典型用例是外部服务提供商不允许每分钟超过 n 次请求。
RateLimiterRequestHandlerAdvice 的实现完全基于 Resilience4j 项目,需要注入 RateLimiter 或 RateLimiterConfig。
也可以使用默认配置和/或自定义名称进行配置。
以下示例配置了一个限流建议,限制为每秒一个请求:
@Bean
public RateLimiterRequestHandlerAdvice rateLimiterRequestHandlerAdvice() {
return new RateLimiterRequestHandlerAdvice(RateLimiterConfig.custom()
.limitRefreshPeriod(Duration.ofSeconds(1))
.limitForPeriod(1)
.build());
}
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "resultChannel",
adviceChain = "rateLimiterRequestHandlerAdvice")
public String handleRequest(String payload) {
...
}
缓存建议
从版本 5 开始。2,CacheRequestHandlerAdvice 已引入。它基于 Spring Framework 中的缓存抽象,并与 @Caching 注解系列提供的概念和功能保持一致。内部逻辑基于 CacheAspectSupport 扩展,其中缓存操作的代理围绕 AbstractReplyProducingMessageHandler.RequestHandler.handleRequestMessage 方法执行,并以请求 Message<?> 作为参数。此建议可以通过 SpEL 表达式或 Function 进行配置,以评估缓存键。请求 Message<?> 可作为 SpEL 求值上下文的根对象,或作为 Function 输入参数使用。默认情况下,请求消息中的payload用作缓存键。CacheRequestHandlerAdvice 必须与 cacheNames 配合配置,当默认缓存操作为 CacheableOperation 时,或者与任意一组 CacheOperation 配合使用。每个 CacheOperation 都可以单独配置,也可以拥有共享选项,例如 CacheManager、CacheResolver 和 CacheErrorHandler,这些可以从 CacheRequestHandlerAdvice 配置中复用。此配置功能类似于 Spring Framework 的 @CacheConfig 和 @Caching 注解组合。如果未提供 CacheManager,则默认从 CacheAspectSupport 中的 BeanFactory 解析单个 Bean。
以下示例配置了两个具有不同缓存操作集的增强(advices):
@Bean
public CacheRequestHandlerAdvice cacheAdvice() {
CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice(TEST_CACHE);
cacheRequestHandlerAdvice.setKeyExpressionString("payload");
return cacheRequestHandlerAdvice;
}
@Transformer(inputChannel = "transformerChannel", outputChannel = "nullChannel", adviceChain = "cacheAdvice")
public Object transform(Message<?> message) {
...
}
@Bean
public CacheRequestHandlerAdvice cachePutAndEvictAdvice() {
CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice();
cacheRequestHandlerAdvice.setKeyExpressionString("payload");
CachePutOperation.Builder cachePutBuilder = new CachePutOperation.Builder();
cachePutBuilder.setCacheName(TEST_PUT_CACHE);
CacheEvictOperation.Builder cacheEvictBuilder = new CacheEvictOperation.Builder();
cacheEvictBuilder.setCacheName(TEST_CACHE);
cacheRequestHandlerAdvice.setCacheOperations(cachePutBuilder.build(), cacheEvictBuilder.build());
return cacheRequestHandlerAdvice;
}
@ServiceActivator(inputChannel = "serviceChannel", outputChannel = "nullChannel",
adviceChain = "cachePutAndEvictAdvice")
public Message<?> service(Message<?> message) {
...
}
响应式通知
从版本 5.3 开始,对于产生 Mono 响应的请求消息处理器,可以使用 ReactiveRequestHandlerAdvice。
为此需要提供一个 BiFunction<Message<?>, Mono<?>, Publisher<?>>,它会在被拦截的 handleRequestMessage() 方法实现产生的响应上通过 Mono.transform() 操作符被调用。
通常,当我们希望通过 timeout()、retry() 等类似的支持操作符来控制网络波动时,就需要这种 Mono 自定义功能。
例如,当通过 WebFlux 客户端发送 HTTP 请求时,我们可以使用以下配置,使等待响应的时间不超过 5 秒:
.handle(WebFlux.outboundGateway("https://somehost/"),
e -> e.customizeMonoReply((message, mono) -> mono.timeout(Duration.ofSeconds(5))));
message 参数是消息处理器的请求消息,可用于确定请求范围的属性。
mono 参数是该消息处理器的 handleRequestMessage() 方法实现的执行结果。
也可以从该函数中调用嵌套的 Mono.transform(),例如应用 响应式熔断器。
上下文持有者建议
从版本 6.1 开始,引入了ContextHolderRequestHandlerAdvice。
此通知会从请求消息中获取一些值,并将其存储在上下文持有者中。
当目标MessageHandler上的执行完成后,上下文中的该值会被清除。
理解此通知的最佳方式类似于编程流程:我们将某些值存储到ThreadLocal中,通过目标调用访问它,然后在执行后清理ThreadLocal。
ContextHolderRequestHandlerAdvice需要以下构造函数参数:Function<Message<?>, Object>作为值提供者,Consumer<Object>作为上下文设置回调,以及Runnable作为上下文清理钩子。
以下是 ContextHolderRequestHandlerAdvice 如何与 o.s.i.file.remote.session.DelegatingSessionFactory 结合使用的示例:
@Bean
DelegatingSessionFactory<?> dsf(SessionFactory<?> one, SessionFactory<?> two) {
return new DelegatingSessionFactory<>(Map.of("one", one, "two", two), null);
}
@Bean
ContextHolderRequestHandlerAdvice contextHolderRequestHandlerAdvice(DelegatingSessionFactory<String> dsf) {
return new ContextHolderRequestHandlerAdvice(message -> message.getHeaders().get("FACTORY_KEY"),
dsf::setThreadKey, dsf::clearThreadKey);
}
@ServiceActivator(inputChannel = "in", adviceChain = "contextHolderRequestHandlerAdvice")
FtpOutboundGateway ftpOutboundGateway(DelegatingSessionFactory<?> sessionFactory) {
return new FtpOutboundGateway(sessionFactory, "ls", "payload");
}
只需向 in 通道发送一条消息,并将 FACTORY_KEY 头设置为 one 或 two 即可。
ContextHolderRequestHandlerAdvice 通过其 setThreadKey 将该头的值设置到 DelegatingSessionFactory 中。
随后,当 FtpOutboundGateway 执行 ls 命令时,会根据其 ThreadLocal 中的值从 DelegatingSessionFactory 中选择一个合适的委托 SessionFactory。
当 FtpOutboundGateway 产生结果后,将根据来自 ContextHolderRequestHandlerAdvice 的 clearThreadKey() 调用清除 DelegatingSessionFactory 中的 ThreadLocal 值。
有关更多信息,请参阅 委托会话工厂。
自定义通知类
除了前面描述的建议类之外,您还可以实现自己的建议类。
虽然您可以提供任何 org.aopalliance.aop.Advice(通常是 org.aopalliance.intercept.MethodInterceptor)的实现,但我们通常建议您子类化 o.s.i.handler.advice.AbstractRequestHandlerAdvice。
这样做的好处是避免了编写低级面向切面编程代码,同时提供了一个专门为此环境量身定制的起点。
子类需要实现 doInvoke() 方法,其定义如下:
/**
* Subclasses implement this method to apply behavior to the {@link MessageHandler} callback.execute()
* invokes the handler method and returns its result, or null).
* @param callback Subclasses invoke the execute() method on this interface to invoke the handler method.
* @param target The target handler.
* @param message The message that will be sent to the handler.
* @return the result after invoking the {@link MessageHandler}.
* @throws Exception
*/
protected abstract Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) throws Exception;
回调参数是一种便利方式,可避免直接处理 AOP 的子类。
调用 callback.execute() 方法将触发消息处理器。
为那些需要为特定处理器维护状态的子类提供了 target 参数,或许是通过以目标为键的 Map 来维护该状态。
此功能允许将相同的通知应用于多个处理器。
RequestHandlerCircuitBreakerAdvice 使用此通知来为每个处理器保持熔断器状态。
message 参数是发送给处理器的消息。
虽然通知无法在调用处理器之前修改消息,但它可以修改有效载荷(如果它具有可变属性)。
通常,通知会使用此消息进行日志记录,或在调用处理器之前或之后将消息的副本发送到某处。
返回值通常由 callback.execute() 返回的值决定。
然而,通知(advice)确实具备修改返回值的能力。
请注意,只有 AbstractReplyProducingMessageHandler 实例会返回值。
以下示例展示了一个扩展自 AbstractRequestHandlerAdvice 的自定义通知类:
public class MyAdvice extends AbstractRequestHandlerAdvice {
@Override
protected Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) throws Exception {
// add code before the invocation
Object result = callback.execute();
// add code after the invocation
return result;
}
}
|
除了 有关更多信息,请参阅 ReflectiveMethodInvocation Javadoc。 |
处理消息建议
正如在本节的介绍中所讨论的那样,请求处理器建议链中的建议对象仅应用于当前端点,而不应用于下游流程(如果有的话)。
对于产生回复的MessageHandler对象(例如那些扩展自AbstractReplyProducingMessageHandler的对象),建议将应用于一个内部方法:handleRequestMessage()(由MessageHandler.handleMessage()调用)。
对于其他消息处理器,建议将应用于MessageHandler.handleMessage()。
在某些情况下,即使消息处理器是一个 AbstractReplyProducingMessageHandler,也必须对 handleMessage 方法应用建议。
例如,幂等接收器 可能会返回 null,如果处理器的 replyRequired 属性设置为 true,这将导致异常。
另一个例子是 BoundRabbitChannelAdvice — 请参见 严格消息排序。
从版本 4.3.1 开始,引入了一个新的 HandleMessageAdvice 接口及其基础实现(AbstractHandleMessageAdvice)。
实现了 HandleMessageAdvice 的 Advice 对象始终应用于 handleMessage() 方法,无论处理器类型如何。
重要的是要理解,HandleMessageAdvice实现(例如幂等接收器),当应用于返回响应的处理器时,与adviceChain解耦,并正确应用于MessageHandler.handleMessage()方法。
| 由于这种分离,建议链的顺序未被遵守。 |
考虑以下配置:
<some-reply-producing-endpoint ... >
<int:request-handler-advice-chain>
<tx:advice ... />
<ref bean="myHandleMessageAdvice" />
</int:request-handler-advice-chain>
</some-reply-producing-endpoint>
在前面的示例中,<tx:advice>被应用于AbstractReplyProducingMessageHandler.handleRequestMessage()。
然而,myHandleMessageAdvice被用于MessageHandler.handleMessage()。
因此,它会在<tx:advice>之前执行。
为了保持顺序,您应遵循标准的Spring AOP配置方法,并结合使用端点id与.handler后缀,以获取目标MessageHandler Bean。
请注意,在这种情况下,整个下游流程都在事务范围内。
如果 MessageHandler 未返回响应,则保留建议链的顺序。
从版本 5.3 开始,HandleMessageAdviceAdapter 被提供以将任何 MethodInterceptor 应用于 MessageHandler.handleMessage() 方法,因此也适用于整个子流程。
例如,可以将 RetryOperationsInterceptor 应用于从某个端点开始的整个子流程;默认情况下这是不可能的,因为消费者端点仅将通知应用于 AbstractReplyProducingMessageHandler.RequestHandler.handleRequestMessage()。
事务支持
从版本 5.0 开始,得益于 HandleMessageAdvice 的实现,引入了一个新的 TransactionHandleMessageAdvice,使整个下游流程具备事务性。
当在 <request-handler-advice-chain> 元素中使用常规的 TransactionInterceptor 时(例如通过配置 <tx:advice>),启动的事务仅应用于内部的 AbstractReplyProducingMessageHandler.handleRequestMessage(),而不会传播到下游流程。
为了简化 XML 配置,除了 <request-handler-advice-chain> 之外,所有 <outbound-gateway> 和 <service-activator> 及相关组件中还添加了 <transactional> 元素。
以下示例展示了 <transactional> 的使用:
<int-jdbc:outbound-gateway query="select * from things where id=:headers[id]">
<int-jdbc:transactional/>
</int-jdbc:outbound-gateway>
<bean id="transactionManager" class="org.mockito.Mockito" factory-method="mock">
<constructor-arg value="org.springframework.transaction.TransactionManager"/>
</bean>
Java 配置可以通过使用 TransactionInterceptorBuilder 来简化,并且结果 Bean 的名称可以在 消息传递注解 的 adviceChain 属性中使用,如下例所示:
@Bean
public ConcurrentMetadataStore store() {
return new SimpleMetadataStore(hazelcastInstance()
.getMap("idempotentReceiverMetadataStore"));
}
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(
new MetadataStoreSelector(
message -> message.getPayload().toString(),
message -> message.getPayload().toString().toUpperCase(), store()));
}
@Bean
public TransactionInterceptor transactionInterceptor() {
return new TransactionInterceptorBuilder(true)
.transactionManager(this.transactionManager)
.isolation(Isolation.READ_COMMITTED)
.propagation(Propagation.REQUIRES_NEW)
.build();
}
@Bean
@org.springframework.integration.annotation.Transformer(inputChannel = "input",
outputChannel = "output",
adviceChain = { "idempotentReceiverInterceptor",
"transactionInterceptor" })
public Transformer transformer() {
return message -> message;
}
请注意 TransactionInterceptorBuilder 构造函数上的 true 参数。
它会导致创建 TransactionHandleMessageAdvice,而不是普通的 TransactionInterceptor。
Java DSL 通过端点配置中的 Advice 和 .transactional() 选项支持,如下示例所示:
@Bean
public IntegrationFlow updatingGatewayFlow() {
return f -> f
.handle(Jpa.updatingGateway(this.entityManagerFactory),
e -> e.transactional(true))
.channel(c -> c.queue("persistResults"));
}
顾问过滤器
在建议 Filter 时,还有一个额外的考虑因素。
默认情况下,任何丢弃操作(当过滤器返回 false 时)都在建议链的作用域内执行。
这可能包括丢弃通道下游的所有流程。
因此,例如,如果丢弃通道下游的某个元素抛出异常并且存在重试建议,则过程将被重试。
此外,如果 throwExceptionOnRejection 设置为 true(异常是在建议的作用域内抛出的)。
将 discard-within-advice 设置为 false 会修改此行为,丢弃(或异常)将在建议链调用之后发生。
使用注解建议端点
当使用注解(@Filter、@ServiceActivator、@Splitter和@Transformer)配置某些端点时,您可以在adviceChain属性中提供用于通知链的 Bean 名称。
此外,@Filter注解也具有discardWithinAdvice属性,可用于配置丢弃行为,如Advising Filters中所讨论的那样。
以下示例导致在通知执行后执行丢弃操作:
@MessageEndpoint
public class MyAdvisedFilter {
@Filter(inputChannel="input", outputChannel="output",
adviceChain="adviceChain", discardWithinAdvice="false")
public boolean filter(String s) {
return s.contains("good");
}
}
在建议链中排序建议
通知类是“环绕”通知,并以嵌套方式应用。 第一个通知是最外层的,而最后一个通知是最内层的(即最接近被通知的处理程序)。 将通知类按正确的顺序放置对于实现您期望的功能至关重要。
例如,假设您想添加重试通知和事务通知。
您可能希望先放置重试通知,然后放置事务通知。
因此,每次重试都在新的事务中执行。
另一方面,如果您希望所有尝试以及任何恢复操作(在重试 RecoveryCallback 中)都限制在事务范围内,您可以将事务通知放在前面。
建议处理器属性
有时,从通知内部访问处理器属性非常有用。
例如,大多数处理器实现了 NamedComponent,以便您能够访问组件名称。
目标对象可以通过 target 参数(当子类化 AbstractRequestHandlerAdvice 时)或 invocation.getThis()(当实现 org.aopalliance.intercept.MethodInterceptor 时)进行访问。
当整个处理器被通知时(例如,当处理器不生成回复或通知实现了 HandleMessageAdvice),您可以将目标对象转换为接口,例如 NamedComponent,如下示例所示:
String componentName = ((NamedComponent) target).getComponentName();
当您直接实现 MethodInterceptor 时,可以按如下方式将目标对象进行类型转换:
String componentName = ((NamedComponent) invocation.getThis()).getComponentName();
当仅对 handleRequestMessage() 方法提供建议(在产生回复的处理程序中)时,您需要访问完整的处理程序,它是一个 AbstractReplyProducingMessageHandler。
以下示例展示了如何实现这一点:
AbstractReplyProducingMessageHandler handler =
((AbstractReplyProducingMessageHandler.RequestHandler) target).getAdvisedHandler();
String componentName = handler.getComponentName();
幂等接收者企业集成模式
从 4.1 版本开始,Spring Integration 提供了企业集成模式 幂等接收器 的实现。
这是一个功能型模式,整个幂等性逻辑应由应用程序实现。
然而,为了简化决策过程,提供了 IdempotentReceiverInterceptor 组件。
这是一个 AOP Advice,应用于 MessageHandler.handleMessage() 方法,并可根据其配置 filter 请求消息或将其标记为 duplicate。
以前,您可以通过在 <filter/> 中使用自定义 MessageSelector(例如参见 过滤器)来实现此模式。
然而,由于该模式实际上定义的是端点的行为而非端点本身,因此幂等接收器实现不提供端点组件。
相反,它应用于应用程序中声明的端点。
IdempotentReceiverInterceptor的逻辑基于提供的MessageSelector,如果消息未被该选择器接受,则使用设置为true的duplicateMessage标头对其进行丰富。
目标MessageHandler(或下游流)可以查阅此标头以实现正确的幂等性逻辑。
如果IdempotentReceiverInterceptor配置了discardChannel或throwExceptionOnRejection = true,重复消息将不会发送到目标MessageHandler.handleMessage()。
相反,它将被丢弃。
如果您想丢弃(不处理)重复消息,则应将discardChannel配置为NullChannel,例如默认的nullChannel bean。
为了在消息之间维护状态并提供用于幂等性比较消息的能力,我们提供了 MetadataStoreSelector。
它接受一个基于 Message 创建查找键的 MessageProcessor 实现,以及一个可选的 ConcurrentMetadataStore(元数据存储)。
有关更多信息,请参阅 MetadataStoreSelector Javadoc。
您还可以通过使用额外的 MessageProcessor 来为 ConcurrentMetadataStore 自定义 value。
默认情况下,MetadataStoreSelector 使用 timestamp 消息头。
通常情况下,选择器会在键没有现有值时选择一个消息进行接受。
在某些情况下,比较键的当前值和新生成的值对于确定是否应接受该消息非常有用。
从 5.3 版本开始,提供了 compareValues 属性,它引用了 BiPredicate<String, String>;第一个参数是旧值;返回 true 以接受该消息,并在 MetadataStore 中将旧值替换为新值。
这可用于减少键的数量;例如,在处理文件中的行时,您可以将文件名存储在键中,将当前行号存储在值中。
然后,在重启后,您可以跳过已经处理过的行。
有关示例,请参阅 幂等下游处理拆分文件。
为了方便,MetadataStoreSelector 选项可直接在 <idempotent-receiver> 组件上进行配置。
以下列表展示了所有可能的属性:
<idempotent-receiver
id="" (1)
endpoint="" (2)
selector="" (3)
discard-channel="" (4)
metadata-store="" (5)
key-strategy="" (6)
key-expression="" (7)
value-strategy="" (8)
value-expression="" (9)
compare-values="" (10)
throw-exception-on-rejection="" /> (11)
| 1 | IdempotentReceiverInterceptor Bean 的 ID。
可选。 |
| 2 | 此拦截器应用的消费者端点名称或模式。
使用逗号分隔名称(模式)(,),例如 endpoint="aaa, bbb*, ccc, *ddd, eee*fff"。
匹配这些模式的端点 Bean 名称将用于检索目标端点的 MessageHandler Bean(使用其 .handler 后缀),并将 IdempotentReceiverInterceptor 应用于这些 Bean。
必填。 |
| 3 | 一个 MessageSelector Bean 引用。
与 metadata-store 和 key-strategy (key-expression) 互斥。
当未提供 selector 时,必须提供 key-strategy 或 key-strategy-expression 中的一个。 |
| 4 | 标识当 IdempotentReceiverInterceptor 不接受消息时,应将消息发送至哪个通道。
如果省略此属性,重复的消息将被转发至带有 duplicateMessage 头的处理器。
可选。 |
| 5 | 一个 ConcurrentMetadataStore 引用。
由底层的 MetadataStoreSelector 使用。
与 selector 互斥。
可选。
默认的 MetadataStoreSelector 使用一个内部 SimpleMetadataStore,该组件不会在应用程序执行之间维护状态。 |
| 6 | 一个 MessageProcessor 引用。
由底层的 MetadataStoreSelector 使用。
从请求消息中评估一个 idempotentKey。
与 selector 和 key-expression 互斥。
当未提供 selector 时,必须指定 key-strategy 或 key-strategy-expression 中的一个。 |
| 7 | 用于填充 ExpressionEvaluatingMessageProcessor 的 SpEL 表达式。
由底层的 MetadataStoreSelector 使用。
使用请求消息作为求值上下文根对象来求值 idempotentKey。
与 selector 和 key-strategy 互斥。
当未提供 selector 时,必须指定 key-strategy 或 key-strategy-expression 中的一个。 |
| 8 | 一个 MessageProcessor 引用。
由底层的 MetadataStoreSelector 使用。
评估来自请求消息的 idempotentKey 的 value。
与 selector 和 value-expression 互斥。
默认情况下,'MetadataStoreSelector' 使用 'timestamp' 消息头作为 Metadata 的'value'。 |
| 9 | 一个用于填充ExpressionEvaluatingMessageProcessor的 SpEL 表达式。
由底层的MetadataStoreSelector使用。
通过使用请求消息作为评估上下文根对象,为idempotentKey评估value。
与selector和value-strategy互斥。
默认情况下,'MetadataStoreSelector'使用'timestamp'消息头作为元数据的'value'。 |
| 10 | 对 BiPredicate<String, String> bean 的引用,允许您通过比较键的旧值和新值来可选地选择消息;默认值为 null。 |
| 11 | 如果 IdempotentReceiverInterceptor 拒绝消息,是否抛出异常。
默认为 false。
无论是否提供 discard-channel,此设置均适用。 |
对于 Java 配置,Spring Integration 提供了方法级别的 @IdempotentReceiver 注解。
它用于标记具有消息注解的 method(@ServiceActivator、@Router, and others) to specify which `IdempotentReceiverInterceptor 对象应用于此端点)。
以下示例展示了如何使用 @IdempotentReceiver 注解:
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(new MetadataStoreSelector(m ->
m.getHeaders().get(INVOICE_NBR_HEADER)));
}
@Bean
@ServiceActivator(inputChannel = "input", outputChannel = "output")
@IdempotentReceiver("idempotentReceiverInterceptor")
public MessageHandler myService() {
....
}
当您使用 Java DSL 时,可以将拦截器添加到端点的建议链中,如下示例所示:
@Bean
public IntegrationFlow flow() {
...
.handle("someBean", "someMethod",
e -> e.advice(idempotentReceiverInterceptor()))
...
}
IdempotentReceiverInterceptor 仅设计用于 MessageHandler.handleMessage(Message<?>) 方法。
从 4.3.1 版本开始,它实现了 HandleMessageAdvice,并以 AbstractHandleMessageAdvice 作为基类,以实现更好的解耦。
有关更多信息,请参阅 处理消息建议。 |
日志通道适配器
<logging-channel-adapter> 通常与线束监听器(wire tap)结合使用,如 线束监听器 中所讨论的。
然而,它也可以用作任何流程的最终消费者。
例如,考虑一个以返回结果的 <service-activator> 结尾的流程,但您希望丢弃该结果。
为此,您可以将结果发送到 NullChannel。
或者,您可以将其路由到 INFO 级 <logging-channel-adapter>。
这样,当在 INFO 级别进行日志记录时,您可以看到被丢弃的消息,但在(例如)WARN 级别进行日志记录时则看不到。
使用 NullChannel 时,只有在 DEBUG 级别进行日志记录时才能看到被丢弃的消息。
以下代码列出了 logging-channel-adapter 元素的所有可能属性:
<int:logging-channel-adapter
channel="" (1)
level="INFO" (2)
expression="" (3)
log-full-message="false" (4)
logger-name="" /> (5)
| 1 | 连接日志适配器与上游组件的通道。 |
| 2 | 发送到此适配器的消息的日志级别。
默认值:INFO。 |
| 3 | 一个 SpEL 表达式,精确表示消息中哪些部分被记录。
默认值:payload — 仅记录负载内容。
如果指定了log-full-message,则不能指定此属性。 |
| 4 | 当 true 时,将记录整条消息(包括头部信息)。
默认值:false — 仅记录有效载荷。
如果指定了 expression,则不能指定此属性。 |
| 5 | 指定日志记录器的name(在log4j中称为category)。
用于标识由该适配器创建的日志消息。
这允许为各个适配器设置日志名称(在日志子系统内)。
默认情况下,所有适配器均在以下名称下记录日志:org.springframework.integration.handler.LoggingHandler。 |
使用 Java 配置
以下 Spring Boot 应用程序展示了如何使用 Java 配置来配置 LoggingHandler 的示例:
@SpringBootApplication
public class LoggingJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(LoggingJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
gateway.sendToLogger("foo");
}
@Bean
@ServiceActivator(inputChannel = "logChannel")
public LoggingHandler logging() {
LoggingHandler adapter = new LoggingHandler(LoggingHandler.Level.DEBUG);
adapter.setLoggerName("TEST_LOGGER");
adapter.setLogExpressionString("headers.id + ': ' + payload");
return adapter;
}
@MessagingGateway(defaultRequestChannel = "logChannel")
public interface MyGateway {
void sendToLogger(String data);
}
}
使用 Java DSL 进行配置
以下 Spring Boot 应用展示了如何使用 Java DSL 配置日志通道适配器的示例:
@SpringBootApplication
public class LoggingJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(LoggingJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
gateway.sendToLogger("foo");
}
@Bean
public IntegrationFlow loggingFlow() {
return IntegrationFlow.from(MyGateway.class)
.log(LoggingHandler.Level.DEBUG, "TEST_LOGGER",
m -> m.getHeaders().getId() + ": " + m.getPayload());
}
@MessagingGateway
public interface MyGateway {
void sendToLogger(String data);
}
}
java.util.function接口支持
从 5.1 版本开始,Spring Integration 提供了对 java.util.function 包中接口的直接支持。
所有消息端点(Service Activator、Transformer、Filter 等)现在都可以引用 Function(或 Consumer)bean。
消息注解可以直接应用于这些 bean,类似于常规的 MessageHandler 定义。
例如,如果您有以下 Function bean 定义:
@Configuration
public class FunctionConfiguration {
@Bean
public Function<String, String> functionAsService() {
return String::toUpperCase;
}
}
您可以在 XML 配置文件中将其用作简单参考:
<service-activator input-channel="processorViaFunctionChannel" ref="functionAsService"/>
当我们使用消息注解配置我们的流程时,代码非常直观:
@Bean
@Transformer(inputChannel = "functionServiceChannel")
public Function<String, String> functionAsService() {
return String::toUpperCase;
}
当函数返回数组时,Collection(本质上,任何 Iterable)、Stream 或 Reactor Flux、@Splitter 可用于此类 Bean 以对结果内容执行迭代。
java.util.function.Consumer 接口可用于 <int:outbound-channel-adapter>,或结合 @ServiceActivator 注解执行流程的最后一步:
@Bean
@ServiceActivator(inputChannel = "messageConsumerServiceChannel")
public Consumer<Message<?>> messageConsumerAsService() {
// Has to be an anonymous class for proper type inference
return new Consumer<Message<?>>() {
@Override
public void accept(Message<?> e) {
collector().add(e);
}
};
}
此外,请注意上面代码片段中的注释:如果您希望在 Function/Consumer 中处理整个消息,则不能使用 lambda 表达式定义。
由于 Java 类型擦除,我们无法确定 apply()/accept() 方法调用的目标类型。
java.util.function.Supplier 接口可以简单地与 @InboundChannelAdapter 注解一起使用,或者作为 ref 中的 <int:inbound-channel-adapter>:
@Bean
@InboundChannelAdapter(value = "inputChannel", poller = @Poller(fixedDelay = "1000"))
public Supplier<String> pojoSupplier() {
return () -> "foo";
}
使用 Java DSL,我们只需在端点定义中引用函数 Bean。
同时,Supplier 接口的实现可作为常规的 MessageSource 定义使用:
@Bean
public Function<String, String> toUpperCaseFunction() {
return String::toUpperCase;
}
@Bean
public Supplier<String> stringSupplier() {
return () -> "foo";
}
@Bean
public IntegrationFlow supplierFlow() {
return IntegrationFlow.from(stringSupplier())
.transform(toUpperCaseFunction())
.channel("suppliedChannel")
.get();
}
此功能支持在与 Spring Cloud Function 框架配合使用时非常有用,在该框架中我们拥有一个函数目录,并可以从集成流定义中引用其成员函数。
Kotlin 支持
该框架也已得到改进,以支持用于函数的 Kotlin Lambda,因此现在您可以结合使用 Kotlin 语言和 Spring Integration 流定义:
@Bean
@Transformer(inputChannel = "functionServiceChannel")
fun kotlinFunction(): (String) -> String {
return { it.toUpperCase() }
}
@Bean
@ServiceActivator(inputChannel = "messageConsumerServiceChannel")
fun kotlinConsumer(): (Message<Any>) -> Unit {
return { print(it) }
}
@Bean
@InboundChannelAdapter(value = "counterChannel",
poller = Poller(fixedRate = "10", maxMessagesPerPoll = "1"))
fun kotlinSupplier(): () -> String {
return { "baz" }
}
Kotlin 协程
从 6.0 版本开始,Spring Integration 支持 Kotlin 协程。
现在可以在服务方法中使用 suspend 函数以及 kotlinx.coroutines.Deferred & kotlinx.coroutines.flow.Flow 返回类型:
@ServiceActivator(inputChannel = "suspendServiceChannel", outputChannel = "resultChannel")
suspend fun suspendServiceFunction(payload: String) = payload.uppercase()
@ServiceActivator(inputChannel = "flowServiceChannel", outputChannel = "resultChannel", async = "true")
fun flowServiceFunction(payload: String) =
flow {
for (i in 1..3) {
emit("$payload #$i")
}
}
该框架将它们视为响应式流(Reactive Streams)交互,并使用 ReactiveAdapterRegistry 转换为相应的 Mono 和 Flux 反应类型。
此类函数回复随后在回复通道中进行处理:如果它是 ReactiveStreamsSubscribableChannel,则作为 CompletableFuture 的结果在各自的回调中处理。
在 @ServiceActivator 上,返回值为 Flow 的函数默认不是 async,因此会生成 Flow 实例作为回复消息的有效载荷。
目标应用程序有责任将该对象作为协程进行处理,或将其分别转换为 Flux。 |
The @MessagingGateway 接口方法在 Kotlin 中声明时也可以使用 suspend 修饰符进行标记。
框架内部利用 Mono 通过下游流执行请求 - 回复操作。
此类 Mono 结果由 MonoKt.awaitSingleOrNull() API 内部处理,以满足网关调用的 suspend 函数的 kotlin.coroutines.Continuation 参数要求:
@MessagingGateway(defaultRequestChannel = "suspendRequestChannel")
interface SuspendFunGateway {
suspend fun suspendGateway(payload: String): String
}
根据 Kotlin 语言要求,此方法必须作为协程调用:
@Autowired
private lateinit var suspendFunGateway: SuspendFunGateway
fun someServiceMethod() {
runBlocking {
val reply = suspendFunGateway.suspendGateway("test suspend gateway")
}
}