消息传递端点
消息传递端点
消息端点
本章的第一部分涵盖了一些背景理论,并揭示了驱动 Spring Integration 的各种消息传递组件的底层 API 的相当多的信息。 如果您想真正了解幕后发生的事情,此信息会很有帮助。 但是,如果您想启动并运行各种元素的简化的基于命名空间的配置,请随时跳到端点命名空间支持。
如概述中所述,消息端点负责将各种消息传递组件连接到通道。 在接下来的几章中,我们将介绍使用消息的许多不同组件。 其中一些还能够发送回复消息。 发送消息非常简单。 如前面的消息通道所示,您可以向消息通道发送消息。 但是,接收要复杂一些。 主要原因是有两种类型的消费者:轮询消费者和事件驱动消费者。
在两者中,事件驱动的消费者要简单得多。
无需管理和调度单独的轮询器线程,它们本质上是具有回调方法的侦听器。
当连接到 Spring Integration 的可订阅消息通道之一时,这个简单的选项效果很好。
但是,当连接到缓冲、可轮询的消息通道时,某些组件必须调度和管理轮询线程。
Spring Integration提供了两种不同的端点实现来适应这两种类型的消费者。
因此,消费者自己只需要实现回调接口。
当需要轮询时,端点充当使用者实例的容器。
其好处类似于使用容器托管消息驱动的 bean,但是,由于这些消费者是在ApplicationContext
,它更类似于 Spring 自己的MessageListener
器皿。
消息处理程序
Spring Integration 的MessageHandler
接口由框架内的许多组件实现。
换句话说,这不是公共 API 的一部分,您通常不会实现MessageHandler
径直。
尽管如此,消息使用者使用它来实际处理消耗的消息,因此了解此策略接口确实有助于理解使用者的整体角色。
接口定义如下:
public interface MessageHandler {
void handleMessage(Message<?> message);
}
尽管它很简单,但该接口为以下章节中介绍的大多数组件(路由器、转换器、分离器、聚合器、服务激活器等)提供了基础。 这些组件各自通过它们处理的消息执行非常不同的功能,但实际接收消息的要求是相同的,轮询和事件驱动行为之间的选择也是相同的。 Spring Integration提供了两个端点实现,它们托管这些基于回调的处理程序,并允许它们连接到消息通道。
事件驱动的消费者
因为它是两者中更简单的一个,所以我们首先介绍事件驱动的消费者端点。
您可能还记得SubscribableChannel
接口提供了一个subscribe()
方法,并且该方法接受MessageHandler
参数(如SubscribableChannel
).
以下列表显示了subscribe
方法:
subscribableChannel.subscribe(messageHandler);
由于订阅通道的处理程序不必主动轮询该通道,因此这是一个事件驱动的消费者,并且 Spring Integration 提供的实现接受SubscribableChannel
和MessageHandler
,如以下示例所示:
SubscribableChannel channel = context.getBean("subscribableChannel", SubscribableChannel.class);
EventDrivenConsumer consumer = new EventDrivenConsumer(channel, exampleHandler);
轮询消费者
Spring Integration 还提供了一个PollingConsumer
,并且可以用相同的方式实例化,只是通道必须实现PollableChannel
,如以下示例所示:
PollableChannel channel = context.getBean("pollableChannel", PollableChannel.class);
PollingConsumer consumer = new PollingConsumer(channel, exampleHandler);
轮询使用者还有许多其他配置选项。 例如,触发器是必需属性。 以下示例演示如何设置触发器:
PollingConsumer consumer = new PollingConsumer(channel, handler);
consumer.setTrigger(new PeriodicTrigger(Duration.ofSeconds(30)));
这PeriodicTrigger
通常用简单的间隔 (Duration
),但也支持initialDelay
属性和布尔值fixedRate
属性(默认值为false
——也就是说,没有固定的延迟)。
以下示例设置这两个属性:
PeriodicTrigger trigger = new PeriodicTrigger(Duration.ofSeconds(1));
trigger.setInitialDelay(Duration.ofSeconds(5));
trigger.setFixedRate(true);
前面示例中三个设置的结果是一个触发器,它等待五秒钟,然后每秒触发一次。
这CronTrigger
需要有效的 cron 表达式。
有关详细信息,请参阅 Javadoc。
以下示例将新的CronTrigger
:
CronTrigger trigger = new CronTrigger("*/10 * * * * MON-FRI");
上一个示例中定义的触发器的结果是,在周一到周五每十秒触发一次触发器。
除了触发器之外,还可以指定其他两个与轮询相关的配置属性:maxMessagesPerPoll
和receiveTimeout
. 以下示例演示如何设置这两个属性:
PollingConsumer consumer = new PollingConsumer(channel, handler);
consumer.setMaxMessagesPerPoll(10);
consumer.setReceiveTimeout(5000);
这maxMessagesPerPoll
属性指定在给定轮询作中要接收的最大消息数。这意味着轮询器继续调用receive()
无需等待,直到null
或达到最大值。例如,如果轮询器具有十秒间隔触发器和maxMessagesPerPoll
设置25
,并且它正在轮询队列中有 100 条消息的通道,则可以在 40 秒内检索所有 100 条消息。它抓取 25 条,等待 10 秒,抓取接下来的 25 条,依此类推。 如果maxMessagesPerPoll
配置为负值,则MessageSource.receive()
在单个轮询周期内调用,直到返回null
. 从 5.5 版开始,一个0
value 具有特殊含义 - 跳过MessageSource.receive()
调用,这可能被视为暂停此轮询端点,直到maxMessagesPerPoll
稍后更改为 n 个非零值,例如通过控制总线。
这receiveTimeout
属性指定轮询器在调用接收作时没有可用消息时应等待的时间量。例如,考虑两个选项,它们表面上看起来相似,但实际上却大不相同:第一个选项的间隔触发为 5 秒,接收超时为 50 毫秒,而第二个的间隔触发器为 50 毫秒,接收超时为 5 秒。第一个可能比到达通道晚 4950 毫秒(如果该消息在其一个轮询调用返回后立即到达)。另一方面,第二种配置不会错过超过 50 毫秒的消息。不同之处在于第二个选项需要线程等待。但是,因此,它可以更快地响应到达的消息。这种技术称为“长轮询”,可用于模拟轮询源上的事件驱动行为。
轮询使用者还可以委托给 SpringTaskExecutor
,如以下示例所示:
PollingConsumer consumer = new PollingConsumer(channel, handler);
TaskExecutor taskExecutor = context.getBean("exampleExecutor", TaskExecutor.class);
consumer.setTaskExecutor(taskExecutor);
此外,一个PollingConsumer
有一个名为adviceChain
. 此属性允许您指定List
的 AOP 建议,用于处理包括交易在内的其他跨领域问题。这些建议适用于doPoll()
方法。 有关更深入的信息,请参阅端点命名空间支持下有关 AOP 通知链和事务支持的部分。
前面的示例显示了依赖项查找。但是,请记住,这些消费者通常配置为 Spring bean 定义。事实上,Spring Integration 还提供了一个FactoryBean
叫ConsumerEndpointFactoryBean
根据通道类型创建适当的消费者类型。此外,Spring Integration 具有完整的 XML 命名空间支持,以进一步隐藏这些详细信息。本指南中介绍了每种组件类型,因此介绍了基于命名空间的配置。
许多MessageHandler 实现可以生成回复消息。如前所述,与接收消息相比,发送消息是微不足道的。然而,发送回复消息的时间和数量取决于处理程序类型。例如,聚合器等待大量消息到达,并且通常被配置为拆分器的下游使用者,它可以为它处理的每条消息生成多个回复。使用命名空间配置时,您不需要严格了解所有详细信息。但是,仍然值得知道,其中几个组件共享一个公共基类,即AbstractReplyProducingMessageHandler ,并且它提供了一个setOutputChannel(..) 方法。 |
端点命名空间支持
在本参考手册中,您可以找到端点元素的特定配置示例,例如路由器、转换器、服务激活器等。其中大多数都支持input-channel
属性,并且许多支持output-channel
属性。 解析后,这些端点元素会生成PollingConsumer
或EventDrivenConsumer
,具体取决于input-channel
引用的:PollableChannel
或SubscribableChannel
分别。 当通道可轮询时,轮询行为基于端点元素的poller
子元素及其属性。
以下列表列出了poller
:
<int:poller cron="" (1)
default="false" (2)
error-channel="" (3)
fixed-delay="" (4)
fixed-rate="" (5)
id="" (6)
max-messages-per-poll="" (7)
receive-timeout="" (8)
ref="" (9)
task-executor="" (10)
time-unit="MILLISECONDS" (11)
trigger=""> (12)
<int:advice-chain /> (13)
<int:transactional /> (14)
</int:poller>
1 | 提供使用 Cron 表达式配置轮询器的功能。底层实现使用org.springframework.scheduling.support.CronTrigger . 如果设置了此属性,则不得指定以下属性:fixed-delay ,trigger ,fixed-rate 和ref . |
2 | 通过将此属性设置为true ,则只能定义一个全局默认轮询器。如果在应用程序上下文中定义了多个默认轮询器,则会引发异常。连接到PollableChannel (PollingConsumer ) 或任何SourcePollingChannelAdapter 没有显式配置的轮询器,则使用全局默认轮询器。它默认为false . 自选。 |
3 | 标识在此轮询器的调用中发生故障时将错误消息发送到的通道。要完全抑制异常,您可以提供对nullChannel . 自选。 |
4 | 固定延迟触发器使用PeriodicTrigger 在被子下。如果您不使用time-unit 属性,则指定的值以毫秒为单位表示。如果设置了此属性,则不得指定以下属性:fixed-rate ,trigger ,cron 和ref . |
5 | 固定利率触发器使用PeriodicTrigger 在被子下。如果您不使用time-unit 属性,则指定的值以毫秒为单位表示。如果设置了此属性,则不得指定以下属性:fixed-delay ,trigger ,cron 和ref . |
6 | 引用轮询器的基础 bean 定义的 ID,其类型为org.springframework.integration.scheduling.PollerMetadata . 这id 属性是顶级轮询器元素所必需的,除非它是默认轮询器(default="true" ). |
7 | 请参阅配置入站通道适配器以了解更多信息。如果未指定,则默认值取决于上下文。如果使用PollingConsumer ,则此属性默认为-1 . 但是,如果您使用SourcePollingChannelAdapter 这max-messages-per-poll 属性默认为1 . 自选。 |
8 | 在基础类上设置值PollerMetadata . 如果未指定,则默认为 1000(毫秒)。 自选。 |
9 | Bean 对另一个顶级轮询器的引用。 这ref 属性不得存在于顶级poller 元素。 但是,如果设置了此属性,则不得指定以下属性:fixed-rate ,trigger ,cron 和fixed-delay . |
10 | 提供引用自定义任务执行器的功能。有关详细信息,请参阅 TaskExecutor 支持。 自选。 |
11 | 此属性指定java.util.concurrent.TimeUnit 枚举值org.springframework.scheduling.support.PeriodicTrigger . 因此,此属性只能与fixed-delay 或fixed-rate 属性。 如果与任一cron 或trigger reference 属性,则会导致失败。支持的最小粒度PeriodicTrigger 是毫秒。因此,唯一可用的选项是毫秒和秒。如果未提供此值,则任何fixed-delay 或fixed-rate 值被解释为毫秒。基本上,此枚举为基于秒的间隔触发器值提供了便利。对于每小时、每天和每月的设置,我们建议使用cron 触发器。 |
12 | 引用任何 Spring 配置的 bean 实现org.springframework.scheduling.Trigger 接口。 但是,如果设置了此属性,则不得指定以下属性:fixed-delay ,fixed-rate ,cron 和ref . 自选。 |
13 | 允许指定额外的 AOP 通知来处理其他跨领域问题。有关更多信息,请参阅事务支持 。 自选。 |
14 | 轮询器可以成为事务性的。有关更多信息,请参阅 AOP 通知链。 自选。 |
例子
可以按如下方式配置具有 1 秒间隔的简单基于间隔的轮询器:
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller fixed-rate="1000"/>
</int:transformer>
作为使用fixed-rate
属性,也可以使用fixed-delay
属性。
对于基于 Cron 表达式的轮询器,请使用cron
属性,如以下示例所示:
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller cron="*/10 * * * * MON-FRI"/>
</int:transformer>
如果输入通道是PollableChannel
,则需要轮询器配置。
具体来说,如前所述,trigger
是PollingConsumer
类。
因此,如果您省略poller
sub元素,则可能会抛出异常。
如果您尝试在连接到不可轮询通道的元素上配置轮询器,也可能会抛出异常。
也可以创建顶级轮询器,在这种情况下,只有一个ref
属性是必需的,如以下示例所示:
<int:poller id="weekdayPoller" cron="*/10 * * * * MON-FRI"/>
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller ref="weekdayPoller"/>
</int:transformer>
这ref 属性仅允许用于内部轮询器定义。
在顶级轮询器上定义此属性会导致在应用程序上下文初始化期间抛出配置异常。 |
全局默认轮询器
为了进一步简化配置,您可以定义全局默认轮询器。
XML DSL 中的单个顶级轮询器组件可能具有default
属性设置为true
.
对于 Java 配置PollerMetadata
bean 与PollerMetadata.DEFAULT_POLLER
在这种情况下,必须声明名称。
在这种情况下,任何具有PollableChannel
对于其输入通道,该通道在同一ApplicationContext
,并且没有显式配置poller
使用该默认值。
以下示例显示了这样的轮询器和使用它的转换器:
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setMaxMessagesPerPoll(5);
pollerMetadata.setTrigger(new PeriodicTrigger(3000));
return pollerMetadata;
}
// No 'poller' attribute because there is a default global poller
@Bean
public IntegrationFlow transformFlow(MyTransformer transformer) {
return IntegrationFlow.from(MessageChannels.queue("pollable"))
.transform(transformer) // No 'poller' attribute because there is a default global poller
.channel("output")
.get();
}
@Bean(PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setMaxMessagesPerPoll(5);
pollerMetadata.setTrigger(new PeriodicTrigger(3000));
return pollerMetadata;
}
@Bean
public QueueChannel pollable() {
return new QueueChannel();
}
// No 'poller' attribute because there is a default global poller
@Transformer(inputChannel = "pollable", outputChannel = "output")
public Object transform(Object payload) {
...
}
@Bean(PollerMetadata.DEFAULT_POLLER)
fun defaultPoller() =
PollerMetadata()
.also {
it.maxMessagesPerPoll = 5
it.trigger = PeriodicTrigger(3000)
}
@Bean
fun convertFlow() =
integrationFlow(MessageChannels.queue("pollable")) {
transform(transformer) // No 'poller' attribute because there is a default global poller
channel("output")
}
<int:poller id="defaultPoller" default="true" max-messages-per-poll="5" fixed-delay="3000"/>
<!-- No <poller/> sub-element is necessary, because there is a default -->
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output"/>
交易支持
Spring Integration还为轮询器提供了事务支持,以便每个接收和转发作都可以作为原子工作单元执行。
要为轮询器配置事务,请将<transactional/>
子元素。
以下示例显示了可用的属性:
<int:poller fixed-delay="1000">
<int:transactional transaction-manager="txManager"
propagation="REQUIRED"
isolation="REPEATABLE_READ"
timeout="10000"
read-only="false"/>
</int:poller>
有关详细信息,请参阅轮询器事务支持。
AOP 建议链
由于 Spring 事务支持依赖于TransactionInterceptor
(AOP 通知)处理轮询程序启动的消息流的事务行为时,有时必须提供额外的建议来处理与轮询程序关联的其他横切行为。
为此,poller
定义一个advice-chain
元素,允许您在实现MethodInterceptor
接口。
以下示例显示了如何定义advice-chain
对于一个poller
:
<int:service-activator id="advicedSa" input-channel="goodInputWithAdvice" ref="testBean"
method="good" output-channel="output">
<int:poller max-messages-per-poll="1" fixed-rate="10000">
<int:advice-chain>
<ref bean="adviceA" />
<beans:bean class="org.something.SampleAdvice" />
<ref bean="txAdvice" />
</int:advice-chain>
</int:poller>
</int:service-activator>
有关如何实现MethodInterceptor
接口,请参阅 Spring Framework 参考指南的 AOP 部分。
通知链也可以应用于没有任何事务配置的轮询器,从而增强轮询器启动的消息流的行为。
使用通知链时,<transactional/> 不能指定子元素。
相反,请声明<tx:advice/> bean 并将其添加到<advice-chain/> .
有关完整的配置详细信息,请参阅轮询器事务支持。 |
TaskExecutor 支持
轮询线程可以由 Spring 的任何实例执行TaskExecutor
抽象化。
这为终结点或终结点组启用并发性。
从 Spring 3.0 开始,核心 Spring Framework 有一个task
命名空间及其<executor/>
元素支持创建简单的线程池执行器。
该元素接受常见并发设置的属性,例如 pool-size 和 queue-capacity。
配置线程池执行器可以对端点在负载下的执行方式产生重大影响。
这些设置可用于每个端点,因为端点的性能是要考虑的主要因素之一(另一个主要因素是端点订阅的通道上的预期卷)。
要为配置了 XML 命名空间支持的轮询端点启用并发性,请提供task-executor
在其上的引用<poller/>
元素,然后提供以下示例中显示的一个或多个属性:
<int:poller task-executor="pool" fixed-rate="1000"/>
<task:executor id="pool"
pool-size="5-25"
queue-capacity="20"
keep-alive="120"/>
如果未提供任务执行器,则在调用方的线程中调用使用者的处理程序。
请注意,调用方通常是默认的TaskScheduler
(请参阅配置任务计划程序)。
您还应该记住,task-executor
属性可以提供对 Spring 的TaskExecutor
接口,通过指定 bean 名称。
这executor
为方便起见,提供了前面显示的元素。
如前文轮询使用者的后台部分所述,您还可以以模拟事件驱动行为的方式配置轮询使用者。
通过较长的接收超时和较短的触发器间隔,即使在轮询的消息源上,您也可以确保对到达的消息做出非常及时的反应。
请注意,这仅适用于具有超时的阻塞等待调用的源。
例如,文件轮询器不会阻塞。
每receive()
调用立即返回,并且包含新文件或不包含新文件。
因此,即使轮询器包含 longreceive-timeout
,则该值永远不会在这种情况下使用。
另一方面,当使用 Spring Integration 自己的基于队列的通道时,超时值确实有机会参与。
以下示例显示轮询使用者如何几乎立即接收消息:
<int:service-activator input-channel="someQueueChannel"
output-channel="output">
<int:poller receive-timeout="30000" fixed-rate="10"/>
</int:service-activator>
使用这种方法不会产生太多开销,因为在内部,它只不过是一个定时等待线程,它不需要像(例如)抖动、无限 while 循环那样多的 CPU 资源使用。
在运行时更改轮询速率
使用fixed-delay
或fixed-rate
属性时,默认实现使用PeriodicTrigger
实例。
这PeriodicTrigger
是核心 Spring 框架的一部分。
它仅接受间隔作为构造函数参数。
因此,它不能在运行时更改。
但是,您可以定义自己的org.springframework.scheduling.Trigger
接口。
您甚至可以使用PeriodicTrigger
作为起点。
然后,您可以为间隔(周期)添加一个 setter,或者您甚至可以在触发器本身中嵌入自己的限制逻辑。
这period
属性用于每次调用nextExecutionTime
以安排下一次投票。
要在轮询器中使用此自定义触发器,请在应用程序上下文中声明自定义触发器的 bean 定义,并使用trigger
属性,该属性引用自定义触发器 Bean 实例。您现在可以获取对触发器 Bean 的引用并更改轮询之间的轮询间隔。
有关示例,请参阅 Spring Integration Samples 项目。
它包含一个名为dynamic-poller
,它使用自定义触发器并演示在运行时更改轮询间隔的能力。
该示例提供了一个自定义触发器,用于实现org.springframework.scheduling.Trigger
接口。
示例的触发器基于 Spring 的PeriodicTrigger
实现。
但是,自定义触发器的字段不是最终的,并且属性具有显式的 getter 和 setter,允许您在运行时动态更改轮询周期。
但需要注意的是,由于 Trigger 方法是nextExecutionTime() ,对动态触发器的任何更改都不会生效,直到下一次轮询时才会生效,具体取决于现有配置。
无法强制触发器在其当前配置的下一次执行时间之前触发。 |
有效负载类型转换
在本参考手册中,您还可以看到接受消息或任何任意消息的各种端点的特定配置和实现示例Object
作为输入参数。
在Object
,此类参数将映射到消息有效负载或有效负载或标头的一部分(使用 Spring 表达式语言时)。
但是,端点方法的输入参数类型有时与有效负载或其部分的类型不匹配。
在这种情况下,我们需要进行类型转换。
Spring Integration 提供了一种注册类型转换器的便捷方法(通过使用 SpringConversionService
) 在其自己的转换服务 Bean 实例中,名为integrationConversionService
.
一旦使用 Spring Integration 基础设施定义了第一个转换器,就会自动创建该 bean。
要注册转换器,您可以实现org.springframework.core.convert.converter.Converter
,org.springframework.core.convert.converter.GenericConverter
或org.springframework.core.convert.converter.ConverterFactory
.
这Converter
实现是最简单的,从单一类型转换为另一种类型。
为了更复杂,例如转换为类层次结构,您可以实现GenericConverter
并且可能是一个ConditionalConverter
.
这些使您可以完全访问from
和to
类型描述符,支持复杂的转换。
例如,如果您有一个名为Something
即转化的目标(参数类型、通道数据类型等),您有两个名为Thing1
和Thing
,并且您希望根据输入类型转换为其中一种,则GenericConverter
会很合适。
有关更多信息,请参阅以下接口的 Javadoc:
实现转换器后,可以使用方便的命名空间支持对其进行注册,如以下示例所示:
<int:converter ref="sampleConverter"/>
<bean id="sampleConverter" class="foo.bar.TestConverter"/>
或者,您可以使用内部 Bean,如以下示例所示:
<int:converter>
<bean class="o.s.i.config.xml.ConverterParserTests$TestConverter3"/>
</int:converter>
从 Spring Integration 4.0 开始,您可以使用注释来创建前面的配置,如以下示例所示:
@Component
@IntegrationConverter
public class TestConverter implements Converter<Boolean, Number> {
public Number convert(Boolean source) {
return source ? 1 : 0;
}
}
或者,您可以使用@Configuration
注释,如以下示例所示:
@Configuration
@EnableIntegration
public class ContextConfiguration {
@Bean
@IntegrationConverter
public SerializingConverter serializingConverter() {
return new SerializingConverter();
}
}
配置应用程序上下文时,Spring Framework 允许您将 相比之下, 但是,如果您确实想使用 Spring
在这种情况下,由 |
内容类型转换
从 5.0 版本开始,默认情况下,方法调用机制基于org.springframework.messaging.handler.invocation.InvocableHandlerMethod
基础设施。
其HandlerMethodArgumentResolver
实现(例如PayloadArgumentResolver
和MessageMethodArgumentResolver
) 可以使用MessageConverter
抽象以转换传入的payload
设置为目标方法参数类型。
转换可以基于contentType
消息头。
为此,Spring Integration 提供了ConfigurableCompositeMessageConverter
,它委托给要调用的已注册转换器列表,直到其中一个转换器返回非空结果。
默认情况下,此转换器提供(按严格顺序):
有关其用途和适当性的更多信息,请参阅 Javadoc(在前面列表中链接)contentType
转换值。
这ConfigurableCompositeMessageConverter
之所以使用,是因为它可以与任何其他MessageConverter
实现,包括或排除前面提到的默认转换器。
它也可以在应用程序上下文中注册为适当的 bean,覆盖默认转换器,如以下示例所示:
@Bean(name = IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME)
public ConfigurableCompositeMessageConverter compositeMessageConverter() {
List<MessageConverter> converters =
Arrays.asList(new MarshallingMessageConverter(jaxb2Marshaller()),
new JavaSerializationMessageConverter());
return new ConfigurableCompositeMessageConverter(converters);
}
这两个新转换器在默认值之前在复合中注册。
您也不能使用ConfigurableCompositeMessageConverter
但提供您自己的MessageConverter
通过注册一个名称为 bean,integrationArgumentResolverMessageConverter
(通过将IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME
属性)。
这MessageConverter -基于(包括contentType header) 转换在使用 SpEL 方法调用时不可用。
在这种情况下,只有上面在有效负载类型转换中提到的常规类到类转换可用。 |
异步轮询
如果您希望轮询是异步的,轮询器可以选择指定task-executor
属性,该属性指向任何TaskExecutor
bean(Spring 3.0 通过task
命名空间)。
但是,在配置轮询器时,您必须了解某些事项TaskExecutor
.
问题是有两种配置,轮询器和TaskExecutor
.
他们必须彼此协调一致。
否则,您最终可能会造成人为内存泄漏。
请考虑以下配置:
<int:channel id="publishChannel">
<int:queue />
</int:channel>
<int:service-activator input-channel="publishChannel" ref="myService">
<int:poller receive-timeout="5000" task-executor="taskExecutor" fixed-rate="50" />
</int:service-activator>
<task:executor id="taskExecutor" pool-size="20" />
上述配置演示了失调配置。
默认情况下,任务执行器具有无界任务队列。 轮询器会继续调度新任务,即使所有线程都被阻塞,等待新消息到达或超时到期。 鉴于有 20 个线程执行任务,超时时间为 5 秒,它们以每秒 4 个的速度执行。 但是,新任务以每秒 20 个的速度调度,因此任务执行器中的内部队列以每秒 16 个的速度增长(当进程空闲时),因此我们存在内存泄漏。
处理此问题的方法之一是将queue-capacity
任务执行器的属性。
即使 0 也是一个合理的值。
您还可以通过指定如何处理无法排队的消息来管理它,方法是将rejection-policy
属性(例如,设置为DISCARD
).
换句话说,在配置时必须了解某些细节TaskExecutor
.
有关该主题的更多详细信息,请参阅 Spring 参考手册中的“任务执行和调度”。
端点内部 Bean
许多端点都是复合 Bean。
这包括所有使用者和所有轮询的入站通道适配器。
使用者(轮询或事件驱动)委托给MessageHandler
.
轮询适配器通过委托给MessageSource
.
通常,获取对委托 Bean 的引用很有用,也许是在运行时更改配置或用于测试。
这些豆子可以从ApplicationContext
有众所周知的名字。MessageHandler
实例在应用程序上下文中注册,其 bean ID 类似于someConsumer.handler
(其中 'consumer' 是端点的id
属性)。MessageSource
实例使用 Bean ID 注册,类似于somePolledAdapter.source
,其中 'somePolledAdapter' 是适配器的 ID。
上述内容仅适用于框架组件本身。 您可以改用内部 Bean 定义,如以下示例所示:
<int:service-activator id="exampleServiceActivator" input-channel="inChannel"
output-channel = "outChannel" method="foo">
<beans:bean class="org.foo.ExampleServiceActivator"/>
</int:service-activator>
Bean 被视为声明的任何内部 Bean,并且不会在应用程序上下文中注册。
如果您希望以其他方式访问此 bean,请在顶层使用id
并使用ref
属性。
有关更多信息,请参阅 Spring 文档。
端点角色
从 4.2 版开始,可以将终结点分配给角色。
角色允许终结点作为一个组启动和停止。
这在使用领导选举时特别有用,其中可以在授予或撤销领导权时分别启动或停止一组端点。
为此,框架注册一个SmartLifecycleRoleController
bean 在应用程序上下文中,名称为IntegrationContextUtils.INTEGRATION_LIFECYCLE_ROLE_CONTROLLER
.
每当需要控制生命周期时,可以注入此 bean 或@Autowired
:
<bean class="com.some.project.SomeLifecycleControl">
<property name="roleController" ref="integrationLifecycleRoleController"/>
</bean>
您可以使用 XML、Java 配置或以编程方式将端点分配给角色。 以下示例演示如何使用 XML 配置端点角色:
<int:inbound-channel-adapter id="ica" channel="someChannel" expression="'foo'" role="cluster"
auto-startup="false">
<int:poller fixed-rate="60000" />
</int:inbound-channel-adapter>
以下示例显示了如何为在 Java 中创建的 Bean 配置端点角色:
@Bean
@ServiceActivator(inputChannel = "sendAsyncChannel", autoStartup="false")
@Role("cluster")
public MessageHandler sendAsyncHandler() {
return // some MessageHandler
}
以下示例显示了如何在 Java 中的方法上配置端点角色:
@Payload("#args[0].toLowerCase()")
@Role("cluster")
public String handle(String payload) {
return payload.toUpperCase();
}
以下示例演示如何使用SmartLifecycleRoleController
在 Java 中:
@Autowired
private SmartLifecycleRoleController roleController;
...
this.roleController.addSmartLifeCycleToRole("cluster", someEndpoint);
...
以下示例演示如何使用IntegrationFlow
在 Java 中:
IntegrationFlow flow -> flow
.handle(..., e -> e.role("cluster"));
其中每一个都会将端点添加到cluster
角色。
调用roleController.startLifecyclesInRole("cluster")
和相应的stop…
方法启动和停止端点。
实现SmartLifecycle 可以通过编程方式添加,而不仅仅是端点。 |
这SmartLifecycleRoleController
实现ApplicationListener<AbstractLeaderEvent>
它会自动启动和停止其配置的SmartLifecycle
授予或撤销领导权时(当某些 Bean 发布OnGrantedEvent
或OnRevokedEvent
,分别)。
使用领导选举启动和停止组件时,请务必将auto-startup XML 属性 (autoStartup bean 属性) 设置为false 以便应用程序上下文在上下文初始化期间不会启动组件。 |
从 4.3.8 版本开始,SmartLifecycleRoleController
提供了几种状态方法:
public Collection<String> getRoles() (1)
public boolean allEndpointsRunning(String role) (2)
public boolean noEndpointsRunning(String role) (3)
public Map<String, Boolean> getEndpointsRunningStatus(String role) (4)
1 | 返回正在管理的角色的列表。 |
2 | 返回true 如果角色中的所有终结点都在运行。 |
3 | 返回true 如果角色中没有一个终结点正在运行。 |
4 | 返回component name : running status .
组件名称通常是 bean 名称。 |
领导活动处理
可以分别根据授予或撤销的领导权启动和停止终结点组。 这在集群方案中非常有用,其中共享资源必须仅由单个实例使用。 这方面的一个示例是轮询共享目录的文件入站通道适配器。 (请参阅读取文件)。
为了参与领导者选举,并在当选为领导者、撤销领导层或未能获得成为领导者的资源时收到通知,应用程序会在应用程序上下文中创建一个称为“领导者发起方”的组件。
通常,领导者Starters是SmartLifecycle
,因此它会在上下文启动时启动(可选),然后在领导层发生变化时发布通知。
您还可以通过设置publishFailedEvents
自true
(从 5.0 版开始),适用于您希望在发生故障时采取特定作的情况。
按照惯例,您应该提供一个Candidate
接收回调。
您还可以通过Context
框架提供的对象。
您的代码还可以监听o.s.i.leader.event.AbstractLeaderEvent
实例(OnGrantedEvent
和OnRevokedEvent
)并做出相应的响应(例如,通过使用SmartLifecycleRoleController
).
这些事件包含对Context
对象。
以下列表显示了Context
接口:
public interface Context {
boolean isLeader();
void yield();
String getRole();
}
从版本 5.0.6 开始,上下文提供对候选人角色的引用。
Spring Integration 提供了基于LockRegistry
抽象化。
要使用它,您需要创建一个实例作为 bean,如以下示例所示:
@Bean
public LockRegistryLeaderInitiator leaderInitiator(LockRegistry locks) {
return new LockRegistryLeaderInitiator(locks);
}
如果锁注册表实现正确,则最多只有一个领导者。如果锁注册表还提供引发异常的锁(理想情况下,InterruptedException
) 当它们过期或被破坏时,无领导期的持续时间可以尽可能短,只要锁实现中的固有延迟允许。默认情况下,busyWaitMillis
属性会添加一些额外的延迟,以防止在(更常见的)情况下出现 CPU 匮乏,即锁不完美,并且只有在尝试再次获取锁时才知道它们已过期。
有关领导选举和使用 Zookeeper 的事件的更多信息,请参阅 Zookeeper 领导事件处理。有关领导选举和使用 Hazelcast 的事件的更多信息,请参阅 Hazelcast 领导事件处理。
消息传递网关
网关隐藏了 Spring Integration 提供的消息传递 API。它允许应用程序的业务逻辑不知道 Spring Integration API。通过使用通用网关,您的代码仅与一个简单的接互。
输入GatewayProxyFactoryBean
如前所述,如果不依赖 Spring Integration API(包括网关类),那就太好了。
因此,Spring Integration 提供了GatewayProxyFactoryBean
,它为任何接口生成代理,并在内部调用如下所示的网关方法。
通过使用依赖注入,您可以将接口公开给您的业务方法。
以下示例显示了一个可用于与 Spring Integration 交互的接口:
package org.cafeteria;
public interface Cafe {
void placeOrder(Order order);
}
网关 XML 命名空间支持
还提供命名空间支持。 它允许您将接口配置为服务,如以下示例所示:
<int:gateway id="cafeService"
service-interface="org.cafeteria.Cafe"
default-request-channel="requestChannel"
default-reply-timeout="10000"
default-reply-channel="replyChannel"/>
定义此配置后,cafeService
现在可以注入到其他 bean,并且调用该代理实例上的方法的代码Cafe
接口不知道 Spring Integration API。
请参阅“示例”附录,了解使用gateway
元素(在 Cafe 演示中)。
上述配置中的默认值将应用于网关接口上的所有方法。 如果未指定应答超时,则调用线程将等待应答 30 秒。 请参阅未到达响应时的网关行为。
可以覆盖单个方法的默认值。 请参阅使用注释和 XML 的网关配置。
设置默认回复通道
通常,您不需要指定default-reply-channel
,因为网关会自动创建一个临时的匿名应答通道,并在其中侦听应答。
但是,在某些情况下,可能会提示您定义default-reply-channel
(或reply-channel
与适配器网关(例如 HTTP、JMS 等)一起使用)。
对于一些背景信息,我们简要讨论网关的一些内部工作原理。
网关创建临时点对点应答通道。
它是匿名的,并以名称replyChannel
.
当提供显式default-reply-channel
(reply-channel
使用远程适配器网关),您可以指向发布-订阅通道,该通道之所以如此命名,是因为您可以向其添加多个订阅者。
在内部,Spring Integration 在临时replyChannel
和显式定义的default-reply-channel
.
假设您希望您的回复不仅发送到网关,还发送到其他一些消费者。 在这种情况下,您需要两件事:
-
您可以订阅的命名频道
-
该频道将成为发布-订阅-频道
网关使用的默认策略不能满足这些需求,因为添加到标头的应答通道是匿名的和点对点的。
这意味着没有其他订阅者可以获取它的句柄,即使可以,通道也具有点对点行为,因此只有一个订阅者可以获取消息。
通过定义default-reply-channel
您可以指向您选择的频道。
在这种情况下,这是一个publish-subscribe-channel
.
网关从它创建一个桥接器,连接到存储在标头中的临时匿名应答通道。
您可能还希望显式提供一个应答通道,以便通过拦截器(例如,窃听)进行监视或审计。 要配置通道拦截器,您需要一个命名通道。
从 5.4 版开始,当网关方法返回类型为void ,则框架填充replyChannel header 作为nullChannel bean 引用,如果未显式提供此类标头。
这允许丢弃来自下游流的任何可能的回复,从而满足单向网关合同。 |
使用注释和 XML 的网关配置
请考虑以下示例,该示例扩展了前面的Cafe
接口示例,通过添加@Gateway
注解:
public interface Cafe {
@Gateway(requestChannel="orders")
void placeOrder(Order order);
}
这@Header
批注允许您添加被解释为消息标头的值,如以下示例所示:
public interface FileWriter {
@Gateway(requestChannel="filesOut")
void write(byte[] content, @Header(FileHeaders.FILENAME) String filename);
}
如果您更喜欢使用 XML 方法来配置网关方法,则可以将method
元素添加到网关配置中,如以下示例所示:
<int:gateway id="myGateway" service-interface="org.foo.bar.TestGateway"
default-request-channel="inputC">
<int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
<int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
<int:method name="echoUpperCase" request-channel="inputB"/>
<int:method name="echoViaDefault"/>
</int:gateway>
您还可以使用 XML 为每个方法调用提供单独的标头。
如果要设置的标头本质上是静态的,并且您不想使用@Header
附注。
例如,在贷款经纪人示例中,我们希望根据发起的请求类型(单引号或所有报价)影响贷款报价的聚合方式。
通过评估调用的网关方法来确定请求的类型,尽管可能,但会违反关注点分离范式(该方法是 Java 工件)。
但是,在消息传递架构中,在消息头中表达您的意图(元信息)是很自然的。
以下示例演示如何为两种方法中的每一种添加不同的邮件头:
<int:gateway id="loanBrokerGateway"
service-interface="org.springframework.integration.loanbroker.LoanBrokerGateway">
<int:method name="getLoanQuote" request-channel="loanBrokerPreProcessingChannel">
<int:header name="RESPONSE_TYPE" value="BEST"/>
</int:method>
<int:method name="getAllLoanQuotes" request-channel="loanBrokerPreProcessingChannel">
<int:header name="RESPONSE_TYPE" value="ALL"/>
</int:method>
</int:gateway>
在前面的示例中,根据网关的方法,为“RESPONSE_TYPE”标头设置了不同的值。
例如,如果指定requestChannel 在<int:method/> 以及在@Gateway 注释,则注释值优先。 |
如果在 XML 中指定了无参数网关,并且接口方法同时具有@Payload 和@Gateway 注释(带有payloadExpression 或payload-expression 在<int:method/> 元素)、@Payload 值被忽略。 |
表达式和“全局”标头
这<header/>
元素支持expression
作为替代方案value
.
计算 SpEL 表达式以确定标头的值。
从 5.2 版开始,#root
对象是MethodArgsHolder
跟getMethod()
和getArgs()
访问。
例如,如果您希望在简单方法名称上进行路由,则可以添加具有以下表达式的标头:method.name
.
这java.reflect.Method 不可序列化。
表达式为method 如果稍后序列化邮件,则会丢失。
因此,您可能希望使用method.name 或method.toString() 在这些情况下。
这toString() 方法提供了一个String 方法的表示形式,包括参数和返回类型。 |
从 3.0 版本开始,<default-header/>
可以定义元素以将标头添加到网关生成的所有消息中,而不管调用的方法如何。
为方法定义的特定标头优先于默认标头。
此处为方法定义的特定标头会覆盖任何@Header
服务接口中的注释。
但是,默认标头不会覆盖任何@Header
服务接口中的注释。
网关现在还支持default-payload-expression
,适用于所有方法(除非被覆盖)。
将方法参数映射到消息
使用上一节中的配置技术可以控制方法参数如何映射到消息元素(有效负载和标头)。 如果未使用显式配置,则使用某些约定来执行映射。 在某些情况下,这些约定无法确定哪个参数是有效负载,哪个参数应映射到标头。 请考虑以下示例:
public String send1(Object thing1, Map thing2);
public String send2(Map thing1, Map thing2);
在第一种情况下,约定是将第一个参数映射到有效负载(只要它不是Map
),第二个参数的内容成为标头。
在第二种情况下(或第一种情况下,当参数的参数thing1
是一个Map
),框架无法确定哪个参数应该是有效负载。
因此,映射失败。
这通常可以使用payload-expression
一个@Payload
注释,或@Headers
注解。
或者(每当约定失效时),您可以承担将方法调用映射到消息的全部责任。
为此,请实现MethodArgsMessageMapper
并将其提供给<gateway/>
通过使用mapper
属性。
映射器映射一个MethodArgsHolder
,这是一个简单的类,它包装了java.reflect.Method
实例和Object[]
包含参数。
提供自定义映射器时,default-payload-expression
属性和<default-header/>
网关上不允许元素。
同样,payload-expression
属性和<header/>
元素不允许在任何<method/>
元素。
映射方法参数
以下示例显示了如何将方法参数映射到消息,并显示了一些无效配置的示例:
public interface MyGateway {
void payloadAndHeaderMapWithoutAnnotations(String s, Map<String, Object> map);
void payloadAndHeaderMapWithAnnotations(@Payload String s, @Headers Map<String, Object> map);
void headerValuesAndPayloadWithAnnotations(@Header("k1") String x, @Payload String s, @Header("k2") String y);
void mapOnly(Map<String, Object> map); // the payload is the map and no custom headers are added
void twoMapsAndOneAnnotatedWithPayload(@Payload Map<String, Object> payload, Map<String, Object> headers);
@Payload("args[0] + args[1] + '!'")
void payloadAnnotationAtMethodLevel(String a, String b);
@Payload("@someBean.exclaim(args[0])")
void payloadAnnotationAtMethodLevelUsingBeanResolver(String s);
void payloadAnnotationWithExpression(@Payload("toUpperCase()") String s);
void payloadAnnotationWithExpressionUsingBeanResolver(@Payload("@someBean.sum(#this)") String s); // (1)
// invalid
void twoMapsWithoutAnnotations(Map<String, Object> m1, Map<String, Object> m2);
// invalid
void twoPayloads(@Payload String s1, @Payload String s2);
// invalid
void payloadAndHeaderAnnotationsOnSameParameter(@Payload @Header("x") String s);
// invalid
void payloadAndHeadersAnnotationsOnSameParameter(@Payload @Headers Map<String, Object> map);
}
1 | 请注意,在本例中,SpEL 变量#this ,引用参数 — 在本例中,的值s . |
XML 等效项看起来有点不同,因为没有#this
method 参数的上下文。
但是,表达式可以通过使用args
属性的MethodArgsHolder
root 对象(有关更多信息,请参阅表达式和“全局”标头),如以下示例所示:
<int:gateway id="myGateway" service-interface="org.something.MyGateway">
<int:method name="send1" payload-expression="args[0] + 'thing2'"/>
<int:method name="send2" payload-expression="@someBean.sum(args[0])"/>
<int:method name="send3" payload-expression="method"/>
<int:method name="send4">
<int:header name="thing1" expression="args[2].toUpperCase()"/>
</int:method>
</int:gateway>
@MessagingGateway
注解
从 4.0 版开始,网关服务接口可以标记@MessagingGateway
注释,而不是要求定义<gateway />
xml 元素进行配置。
以下一对示例比较了配置同一网关的两种方法:
<int:gateway id="myGateway" service-interface="org.something.TestGateway"
default-request-channel="inputC">
<int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
<int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
<int:method name="echoUpperCase" request-channel="inputB">
<int:header name="thing1" value="thing2"/>
</int:method>
<int:method name="echoViaDefault"/>
</int:gateway>
@MessagingGateway(name = "myGateway", defaultRequestChannel = "inputC",
defaultHeaders = @GatewayHeader(name = "calledMethod",
expression="#gatewayMethod.name"))
public interface TestGateway {
@Gateway(requestChannel = "inputA", replyTimeout = 2, requestTimeout = 200)
String echo(String payload);
@Gateway(requestChannel = "inputB", headers = @GatewayHeader(name = "thing1", value="thing2"))
String echoUpperCase(String payload);
String echoViaDefault(String payload);
}
与 XML 版本类似,当 Spring Integration 在组件扫描期间发现这些注释时,它会创建proxy 实施及其消息传递基础设施。
要执行此扫描并注册BeanDefinition 在应用程序上下文中,将@IntegrationComponentScan 注释到@Configuration 类。
标准@ComponentScan 基础设施不处理接口。
因此,我们引入了@IntegrationComponentScan 逻辑来查找@MessagingGateway 接口和寄存器上的注释GatewayProxyFactoryBean 实例。
另请参阅注释支持。 |
与@MessagingGateway
注释,您可以使用@Profile
注释以避免创建 bean(如果此类配置文件未处于活动状态)。
从 6.0 版开始,带有@MessagingGateway
也可以用@Primary
注释,因为它可以在任何 Spring 中使用@Component
定义。
从 6.0 版本开始,@MessagingGateway
接口可以在标准 Spring 中使用@Import
配置。
这可以用作@IntegrationComponentScan
或手动AnnotationGatewayProxyFactoryBean
bean 定义。
这@MessagingGateway
用@MessageEndpoint
自版本6.0
和name()
属性本质上是别名为@Compnent.value()
.
这样,网关代理的 bean 名称生成策略将与扫描和导入组件的标准 Spring 注释配置重新对齐。
默认值AnnotationBeanNameGenerator
可以通过AnnotationConfigUtils.CONFIGURATION_BEAN_NAME_GENERATOR
或作为@IntegrationComponentScan.nameGenerator()
属性。
如果您没有 XML 配置,则@EnableIntegration 至少一个@Configuration 类。
看配置和@EnableIntegration 了解更多信息。 |
调用无参数方法
在没有任何参数的网关接口上调用方法时,默认行为是接收Message
从PollableChannel
.
但是,有时您可能希望触发无参数方法,以便可以与不需要用户提供参数的下游其他组件进行交互,例如触发无参数 SQL 调用或存储过程。
若要实现发送和接收语义,必须提供有效负载。
要生成有效负载,不需要接口上的方法参数。
您可以使用@Payload
注释或payload-expression
属性method
元素。 以下列表包括有效负载可能是什么的几个示例:
-
文字字符串
-
#gatewayMethod.name
-
新的 java.util.Date()
-
@someBean.someMethod() 的返回值
以下示例演示如何使用@Payload
注解:
public interface Cafe {
@Payload("new java.util.Date()")
List<Order> retrieveOpenOrders();
}
您还可以使用@Gateway
注解。
public interface Cafe {
@Gateway(payloadExpression = "new java.util.Date()")
List<Order> retrieveOpenOrders();
}
如果两个注释都存在(并且payloadExpression 提供),@Gateway 获胜。 |
另请参阅使用注释和 XML 的网关配置。
如果方法没有参数和返回值,但包含有效负载表达式,则将其视为仅发送作。
调用default
方法
网关代理的接口可能具有default
方法,从 5.3 版本开始,框架注入了DefaultMethodInvokingMethodInterceptor
转换为用于调用的代理default
使用java.lang.invoke.MethodHandle
方法而不是代理。
来自 JDK 的接口,例如java.util.function.Function
,仍然可以用于网关代理,但它们的default
由于内部 Java 安全原因,无法调用MethodHandles.Lookup
针对 JDK 类的实例化。
这些方法也可以使用显式@Gateway
注释,或proxyDefaultMethods
在@MessagingGateway
注释或<gateway>
XML 组件。
错误处理
网关调用可能会导致错误。 默认情况下,在网关的方法调用时,下游发生的任何错误都会“按原样”重新抛出。 例如,考虑以下简单流:
gateway -> service-activator
如果服务激活器调用的服务抛出MyException
(例如),框架将其包装在MessagingException
并将传递给服务激活器的消息附加到failedMessage
财产。
因此,框架执行的任何日志记录都具有完整的故障上下文。
默认情况下,当网关捕获异常时,MyException
被解包并抛给调用方。
您可以配置throws
子句,以匹配原因链中的特定异常类型。
例如,如果你想捕捉一个整体MessagingException
有了下游错误原因的所有消息传递信息,您应该有一个类似于以下内容的网关方法:
public interface MyGateway {
void performProcess() throws MessagingException;
}
由于我们鼓励 POJO 编程,您可能不想将调用者暴露给消息传递基础设施。
如果您的网关方法没有throws
子句中,网关遍历原因树,寻找RuntimeException
这不是MessagingException
.
如果未找到任何内容,则框架会抛出MessagingException
.
如果MyException
在前面的讨论中,有一个原因SomeOtherException
和你的方法throws SomeOtherException
,网关进一步解包并将其抛给调用方。
当网关声明时没有service-interface
,一个内部框架接口RequestReplyExchanger
被使用。
请考虑以下示例:
public interface RequestReplyExchanger {
Message<?> exchange(Message<?> request) throws MessagingException;
}
在 5.0 版本之前,这个exchange
方法没有throws
子句,结果,异常被解包。
如果使用此接口并想要恢复以前的展开行为,请使用自定义service-interface
而不是访问cause
的MessagingException
你自己。
但是,您可能希望记录错误而不是传播错误,或者您可能希望将异常视为有效回复(通过将其映射到符合调用方理解的某些“错误消息”协定的消息)。
为此,网关通过包含对error-channel
属性。
在以下示例中,“transformer”创建回复Message
从Exception
:
<int:gateway id="sampleGateway"
default-request-channel="gatewayChannel"
service-interface="foo.bar.SimpleGateway"
error-channel="exceptionTransformationChannel"/>
<int:transformer input-channel="exceptionTransformationChannel"
ref="exceptionTransformer" method="createErrorResponse"/>
这exceptionTransformer
可以是一个简单的 POJO,它知道如何创建预期的错误响应对象。
这将成为发送回调用方的有效负载。
如有必要,您可以在这样的“错误流”中做更多更复杂的事情。
它可能涉及路由器(包括 Spring Integration 的ErrorMessageExceptionTypeRouter
)、过滤器等。
然而,大多数时候,一个简单的“转换器”就足够了。
或者,您可能只想记录异常(或异步将其发送到某个地方)。
如果提供单向流,则不会将任何内容发送回调用方。
如果要完全禁止异常,可以提供对全局nullChannel
(本质上是一个/dev/null
方法)。
最后,如上所述,如果没有error-channel
定义,则异常将照常传播。
当您使用@MessagingGateway
注释(请参阅
),您可以使用@MessagingGateway
AnnotationerrorChannel
属性。
从版本 5.0 开始,当您使用带有void
返回类型(单向流),则error-channel
参考(如果提供)填充在标准中errorChannel
每个已发送消息的标头。
此功能允许基于标准的下游异步流ExecutorChannel
配置(或QueueChannel
),以覆盖默认全局errorChannel
异常发送行为。
以前,您必须手动指定errorChannel
标头与@GatewayHeader
注释或<header>
元素。
这error-channel
属性被忽略void
具有异步流的方法。
相反,错误消息被发送到默认的errorChannel
.
通过简单的 POJI 网关公开消息传递系统会带来好处,但“隐藏”底层消息传递系统的现实确实是有代价的,因此您应该考虑某些事项。
我们希望我们的 Java 方法能够尽快返回,并且在调用者等待它返回时不会无限期地挂起(无论是 void、返回值还是抛出的异常)。
当常规方法用作消息传递系统前面的代理时,我们必须考虑底层消息传递的潜在异步性质。
这意味着,由网关发起的消息可能会被过滤器丢弃,并且永远无法到达负责生成回复的组件。
某些服务激活器方法可能会导致异常,从而不提供任何回复(因为我们不生成空消息)。
换句话说,多种情况可能会导致回复消息永远不会到来。
这在消息传递系统中是非常自然的。
但是,请考虑对网关方法的影响。
网关的方法输入参数被合并到消息中并发送到下游。
回复消息将转换为网关方法的返回值。
因此,您可能希望确保对于每个网关呼叫,始终有一条回复消息。
否则,如果出现以下情况,网关方法可能永远不会返回并无限期挂起reply-timeout 设置为负值。
处理这种情况的一种方法是使用异步网关(本节后面将介绍)。
另一种处理方法是依赖默认值reply-timeout 作为30 秒。
这样,网关挂起的时间不会超过reply-timeout 如果超时确实过去,则返回“null”。
最后,您可能需要考虑在服务激活器上设置下游标志,例如“requires-reply”或在过滤器上设置“throw-exceptions-on-rejection”。
本章的最后一节将更详细地讨论这些选项。 |
如果下游流返回ErrorMessage 其payload (一个Throwable )被视为常规下游错误。
如果有error-channel 已配置,则将其发送到错误流。
否则,有效负载将抛出给网关的调用方。
同样,如果错误流在error-channel 返回一个ErrorMessage ,则其有效负载将抛出给调用方。
这同样适用于任何带有Throwable 有效载荷。
这在异步情况下非常有用,当您需要传播Exception 直接给来电者。
为此,您可以返回Exception (作为reply 来自某些服务)或扔掉它。
通常,即使使用异步流,框架也会负责将下游流抛出的异常传播回网关。
TCP 客户端-服务器多路复用示例演示了将异常返回给调用方的两种技术。
它通过使用aggregator 跟group-timeout (请参阅聚合器和组超时)和MessagingTimeoutException 回复丢弃流。 |
网关超时
网关有两个超时属性:requestTimeout
和replyTimeout
.
仅当通道可以阻塞(例如,有界QueueChannel
那是满的)。
这replyTimeout
value 是网关等待回复或返回的时间null
. 它默认为无大。
超时可以设置为网关上所有方法的默认值(defaultRequestTimeout
和defaultReplyTimeout
) 或在MessagingGateway
接口注释。单个方法可以覆盖这些默认值(在<method/>
child 元素)或@Gateway
注解。
从 5.0 版开始,超时可以定义为表达式,如以下示例所示:
@Gateway(payloadExpression = "args[0]", requestChannel = "someChannel",
requestTimeoutExpression = "args[1]", replyTimeoutExpression = "args[2]")
String lateReply(String payload, long requestTimeout, long replyTimeout);
评估上下文具有BeanResolver
(使用@someBean
引用其他 bean),以及args
array 属性,来自#root
对象可用。有关此根对象的更多信息,请参阅表达式和“全局”标头。使用 XML 配置时,超时属性可以是长值或 SpEL 表达式,如以下示例所示:
<method name="someMethod" request-channel="someRequestChannel"
payload-expression="args[0]"
request-timeout="1000"
reply-timeout="args[1]">
</method>
异步网关
作为一种模式,消息传递网关提供了一种隐藏特定于消息传递的代码的好方法,同时仍然公开消息传递系统的全部功能。
如前所述,GatewayProxyFactoryBean
提供了一种通过服务接口公开代理的便捷方法,使您可以基于 POJO 访问消息传递系统(基于您自己的域中的对象、基元/字符串或其他对象)。
但是,当网关通过返回值的简单 POJO 方法公开时,这意味着对于每个请求消息(在调用方法时生成),必须有一个回复消息(在方法返回时生成)。
由于消息传递系统天生是异步的,您可能并不总是能够保证“对于每个请求,总会有一个回复”的合同。Spring Integration 2.0 引入了对异步网关的支持,当您可能不知道是否需要回复或回复到达需要多长时间时,它提供了一种启动流的便捷方法。
为了处理这些类型的场景,Spring Integration 使用java.util.concurrent.Future
实例以支持异步网关。
从 XML 配置中,没有任何更改,并且您仍然以与定义常规网关相同的方式定义异步网关,如以下示例所示:
<int:gateway id="mathService"
service-interface="org.springframework.integration.sample.gateway.futures.MathServiceGateway"
default-request-channel="requestChannel"/>
但是,网关接口(服务接口)略有不同,如下所示:
public interface MathServiceGateway {
Future<Integer> multiplyByTwo(int i);
}
如前面的示例所示,网关方法的返回类型是Future
.
什么时候GatewayProxyFactoryBean
看到网关方法的返回类型是Future
,它会立即使用AsyncTaskExecutor
.
这就是差异的程度。
对此类方法的调用始终立即返回Future
实例。
然后,您可以与Future
按照自己的节奏获取结果、取消等。
此外,与任何其他用途一样Future
实例, 调用get()
可能会显示超时、执行异常等。
以下示例显示如何使用Future
从异步网关返回:
MathServiceGateway mathService = ac.getBean("mathService", MathServiceGateway.class);
Future<Integer> result = mathService.multiplyByTwo(number);
// do something else here since the reply might take a moment
int finalResult = result.get(1000, TimeUnit.SECONDS);
有关更详细的示例,请参阅 Spring Integration 示例中的 async-gateway 示例。
AsyncTaskExecutor
默认情况下,GatewayProxyFactoryBean
使用org.springframework.core.task.SimpleAsyncTaskExecutor
提交内部时AsyncInvocationTask
返回类型为Future
.
但是,async-executor
属性中的<gateway/>
元素的配置允许您提供对任何实现的引用java.util.concurrent.Executor
在 Spring 应用程序上下文中可用。
(默认)SimpleAsyncTaskExecutor
支持两者Future
和CompletableFuture
返回类型。
看CompletableFuture
.
即使有一个默认的执行器,提供一个外部执行器通常也很有用,以便您可以在日志中识别其线程(使用 XML 时,线程名称基于执行器的 bean 名称),如以下示例所示:
@Bean
public AsyncTaskExecutor exec() {
SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
simpleAsyncTaskExecutor.setThreadNamePrefix("exec-");
return simpleAsyncTaskExecutor;
}
@MessagingGateway(asyncExecutor = "exec")
public interface ExecGateway {
@Gateway(requestChannel = "gatewayChannel")
Future<?> doAsync(String foo);
}
如果您想退回其他Future
实现时,你可以提供自定义执行器或完全禁用执行器并返回Future
在下游流的回复消息有效负载中。
要禁用执行器,请将其设置为null
在GatewayProxyFactoryBean
(通过使用setAsyncTaskExecutor(null)
).
使用 XML 配置网关时,请使用async-executor=""
.
使用@MessagingGateway
注释,请使用类似于以下内容的代码:
@MessagingGateway(asyncExecutor = AnnotationConstants.NULL)
public interface NoExecGateway {
@Gateway(requestChannel = "gatewayChannel")
Future<?> doAsync(String foo);
}
如果返回类型是特定的具体Future 实现或配置的执行程序不支持的其他一些子接口,则流在调用方的线程上运行,并且流必须在回复消息有效负载中返回所需的类型。 |
CompletableFuture
从 4.2 版开始,网关方法现在可以返回CompletableFuture<?>
.
返回此类型时有两种作模式:
-
当提供异步执行器并且返回类型正好是
CompletableFuture
(不是子类),框架在执行器上运行任务并立即返回CompletableFuture
给来电者。CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor)
用来创造未来。 -
当异步执行器显式设置为
null
返回类型为CompletableFuture
或者返回类型是CompletableFuture
,则在调用方的线程上调用流。在此方案中,下游流应返回CompletableFuture
适当类型的。
这org.springframework.util.concurrent.ListenableFuture 从 Spring Framework 开始已被弃用6.0 . 建议现在迁移到CompletableFuture 它提供了类似的处理功能。 |
使用场景
在以下方案中,调用方线程立即返回CompletableFuture<Invoice>
,当下游流回复网关时完成(使用Invoice
对象)。
CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="something.Service" default-request-channel="orders" />
在以下方案中,调用方线程返回CompletableFuture<Invoice>
当下游流将其作为对网关的回复的有效负载提供时。当发票准备就绪时,其他一些流程必须在未来完成。
CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders"
async-executor="" />
在以下方案中,调用方线程返回CompletableFuture<Invoice>
当下游流将其作为对网关的回复的有效负载提供时。当发票准备就绪时,其他一些流程必须在未来完成。 如果DEBUG
启用日志记录时,将发出日志条目,指示异步执行器不能用于此方案。
MyCompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders" />
CompletableFuture
实例可用于对回复执行其他作,如以下示例所示:
CompletableFuture<String> process(String data);
...
CompletableFuture result = process("foo")
.thenApply(t -> t.toUpperCase());
...
String out = result.get(10, TimeUnit.SECONDS);
反应器Mono
从 5.0 版开始,GatewayProxyFactoryBean
允许将 Project Reactor 与网关接口方法一起使用,使用Mono<T>
返回类型。内部AsyncInvocationTask
被包裹在一个Mono.fromCallable()
.
一个Mono
可用于稍后检索结果(类似于Future<?>
),或者您可以通过调用Consumer
当结果返回到网关时。
这Mono 不会立即被框架刷新。
因此,在网关方法返回之前,不会启动基础消息流(因为它使用Future<?> Executor 任务)。
当Mono 被订阅。
或者,Mono (作为“可组合项”)可能是 Reactor 流的一部分,当subscribe() 关系到整个Flux .
以下示例显示如何使用 Project Reactor 创建网关: |
@MessagingGateway
public interface TestGateway {
@Gateway(requestChannel = "multiplyChannel")
Mono<Integer> multiply(Integer value);
}
@ServiceActivator(inputChannel = "multiplyChannel")
public Integer multiply(Integer value) {
return value * 2;
}
其中这样的网关可以在某些服务中使用,该服务处理Flux
数据数量:
@Autowired
TestGateway testGateway;
public void hadnleFlux() {
Flux.just("1", "2", "3", "4", "5")
.map(Integer::parseInt)
.flatMap(this.testGateway::multiply)
.collectList()
.subscribe(System.out::println);
}
另一个使用 Project Reactor 的示例是一个简单的回调场景,如以下示例所示:
Mono<Invoice> mono = service.process(myOrder);
mono.subscribe(invoice -> handleInvoice(invoice));
调用线程继续,并handleInvoice()
在流完成时调用。
另请参阅 Kotlin 协程了解更多信息。
返回异步类型的下游流
如AsyncTaskExecutor
部分,如果您希望某些下游组件返回带有异步有效负载 (Future
,Mono
等),您必须将异步执行器显式设置为null
(或使用 XML 配置时)。
然后在调用方线程上调用流,稍后可以检索结果。""
异步void
返回类型
消息传递网关方法可以这样声明:
@MessagingGateway
public interface MyGateway {
@Gateway(requestChannel = "sendAsyncChannel")
@Async
void sendAsync(String payload);
}
但是下游异常不会传播回调用方。
为了确保下游流调用和异常传播到调用方的异步行为,从 6.0 版开始,框架提供了对Future<Void>
和Mono<Void>
返回类型。
该用例类似于前面描述的普通void
返回类型,但不同之处在于流执行异步发生并返回Future
(或Mono
) 以null
或根据send
作结果。
如果Future<Void> 是精确的下游流回复,然后asyncExecutor 网关的选项必须设置为 null (AnnotationConstants.NULL 对于一个@MessagingGateway 配置)和send 部分在生产者线程上执行。
回复取决于下游流配置。
这样,目标应用程序就可以生成Future<Void> 正确回复。
这Mono use-case 已经超出了框架线程控制,因此将asyncExecutor 设置为 null 没有意义。
那里Mono<Void> 由于请求-回复网关作必须配置为Mono<?> gateway 方法的返回类型。 |
未到达响应时的网关行为
如前所述,网关提供了一种通过 POJO 方法调用与消息传递系统交互的便捷方式。 但是,通常期望始终返回(即使有 Exception)的典型方法调用可能并不总是一对一地映射到消息交换(例如,回复消息可能不会到达 - 相当于方法未返回)。
本部分的其余部分介绍各种方案以及如何使网关的行为更可预测。
可以配置某些属性以使同步网关行为更可预测,但其中一些属性可能并不总是按预期工作。
其中之一是reply-timeout
(在方法级别或default-reply-timeout
在网关级别)。
我们检查reply-timeout
属性,以查看它如何影响同步网关在各种场景中的行为。
我们研究了单线程方案(下游的所有组件都通过直接通道连接)和多线程方案(例如,在下游的某个地方,您可能有一个打破单线程边界的可轮询或执行器通道)。
下游长时间运行的进程
- 同步网关,单线程
-
如果下游组件仍在运行(可能是因为无限循环或服务速度慢),则将
reply-timeout
没有效果,并且网关方法调用在下游服务退出(通过返回或抛出异常)之前不会返回。 - 同步网关,多线程
-
如果下游组件仍在多线程消息流中运行(可能是因为无限循环或服务速度慢),则将
reply-timeout
通过允许网关方法调用在达到超时后返回来产生影响,因为GatewayProxyFactoryBean
轮询,等待消息,直到超时到期。 但是,如果在生成实际回复之前已达到超时,则可能会导致网关方法返回“null”。 您应该了解,回复消息(如果生成)是在网关方法调用可能返回后发送到回复通道的,因此您必须意识到这一点,并在设计流时牢记这一点。
下游组件返回“null”
- 同步网关 — 单线程
-
如果下游组件返回“null”,并且
reply-timeout
已配置为负值,则网关方法调用无限期挂起,除非requires-reply
属性已在下游组件(例如,服务激活器)上设置,该组件可能会返回“null”。 在这种情况下,将抛出异常并将其传播到网关。 - 同步网关 — 多线程
-
行为与前一种情况相同。
下游组件返回签名为“void”,而网关方法签名为非 void
- 同步网关 — 单线程
-
如果下游组件返回“void”,并且
reply-timeout
已配置为负值,则网关方法调用无限期挂起。 - 同步网关 — 多线程
-
行为与前一种情况相同。
下游组件导致运行时异常
- 同步网关 — 单线程
-
如果下游组件抛出运行时异常,则该异常将通过错误消息传播回网关并重新抛出。
- 同步网关 — 多线程
-
行为与前一种情况相同。
您应该了解,默认情况下,reply-timeout 是无限的。
因此,如果将reply-timeout 设置为负值,则网关方法调用可能会无限期挂起。
因此,为了确保分析流,并且如果存在发生其中一种情况的可能性很小,您应该将reply-timeout 属性设置为“'safe'”值。
是的30 秒。
更好的是,您可以将requires-reply 属性设置为“true”,以确保及时响应,这是在下游组件内部返回 null 时立即抛出异常产生的。
但是,您还应该意识到,在某些情况下(请参阅第一个场景),其中reply-timeout 无济于事。
这意味着分析消息流并决定何时使用同步网关而不是异步网关也很重要。
如前所述,后一种情况是定义返回Future 实例。
然后,可以保证您收到该返回值,并且您可以更精细地控制调用结果。
此外,在处理路由器时,您应该记住将resolution-required 属性设置为“true”会导致路由器在无法解析特定通道时引发异常。
同样,在处理过滤器时,您可以将throw-exception-on-rejection 属性。
在这两种情况下,生成的流的行为就像它包含一个具有“requires-reply”属性的服务激活器。
换句话说,它有助于确保网关方法调用的及时响应。 |
您应该了解,计时器在线程返回网关时启动,即当流完成或消息传递给另一个线程时。 此时,调用线程开始等待回复。 如果流完全同步,则应答立即可用。 对于异步流,线程会等待到这个时间。 |
看IntegrationFlow
作为 Gateway在 Java DSL 章节中,了解通过IntegrationFlow
.
服务激活器
服务激活器是将任何 Spring 管理的对象连接到输入通道的端点类型,以便它可以扮演服务的角色。如果服务产生输出,它也可以连接到输出通道。或者,输出生成服务可以位于处理管道或消息流的末尾,在这种情况下,入站消息的replyChannel
标头。如果未定义输出通道,则这是默认行为。与此处描述的大多数配置选项一样,相同的行为实际上适用于大多数其他组件。
服务激活器本质上是一个通用端点,用于使用输入消息(有效负载和标头)对某些对象调用方法。
它的内部逻辑基于一个MessageHandler
它可以是特定用例的任何可能实现,例如DefaultMessageSplitter
,AggregatingMessageHandler
,SftpMessageHandler
,JpaOutboundGateway
等。
因此,本参考手册中提到的任何出站网关和出站通道适配器都应被视为此服务激活器终结点的特定扩展;最后,它们都调用了某个对象的方法。
配置服务激活器
对于 Java 和 Annotation 配置,使用@ServiceActivator
注释 - 当从输入通道使用消息时,框架会调用它:
public class SomeService {
@ServiceActivator(inputChannel = "exampleChannel")
public void exampleHandler(SomeData payload) {
...
}
}
请参阅注释支持中的更多信息。
对于 Java、Groovy 或 Kotlin DSL,则.handle()
运算符IntegrationFlow
表示服务激活器:
@Bean
public IntegrationFlow someFlow() {
return IntegrationFlow
.from("exampleChannel")
.handle(someService, "exampleHandler")
.get();
}
@Bean
fun someFlow() =
integrationFlow("exampleChannel") {
handle(someService, "exampleHandler")
}
@Bean
someFlow() {
integrationFlow 'exampleChannel',
{
handle someService, 'exampleHandler'
}
}
有关 DSL 的更多信息,请参阅相应章节:
要在使用 XML 配置时创建服务激活器,请将“service-activator”元素与“input-channel”和“ref”属性一起使用,如以下示例所示:
<int:service-activator input-channel="exampleChannel" ref="exampleHandler"/>
前面的配置从exampleHandler
满足消息传递要求之一,如下所示:
-
注释为
@ServiceActivator
-
是
public
-
不回来
void
如果requiresReply == true
运行时调用的目标方法由其payload
类型或作为Message<?>
如果目标类上存在此类方法,则为类型。
从 5.0 版开始,一种服务方法可以标记为@org.springframework.integration.annotation.Default
作为所有不匹配情况的回退。当使用内容类型转换并在转换后调用目标方法时,这可能很有用。
要委托给任何对象的显式定义方法,您可以添加method
属性,如以下示例所示:
<int:service-activator input-channel="exampleChannel" ref="somePojo" method="someMethod"/>
无论哪种情况,当服务方法返回非空值时,终结点都会尝试将回复消息发送到适当的回复通道。为了确定回复通道,它首先检查output-channel
在端点配置中提供了,如以下示例所示:
<int:service-activator input-channel="exampleChannel" output-channel="replyChannel"
ref="somePojo" method="someMethod"/>
如果该方法返回结果并且没有output-channel
定义时,框架会检查请求消息的replyChannel
header 值。如果该值可用,则检查其类型。如果它是MessageChannel
,则回复消息将发送到该通道。如果它是String
,端点会尝试将通道名称解析为通道实例。如果无法解析通道,则DestinationResolutionException
被抛出。如果可以解析,则将消息发送到那里。如果请求消息没有replyChannel
header 和reply
对象是一个Message
其replyChannel
标头被查询目标目的地。这是在 Spring Integration 中用于请求-回复消息传递的技术,也是返回地址模式的一个示例。
如果您的方法返回一个结果,并且您想要丢弃它并结束流,则应配置output-channel
发送到NullChannel
.
为方便起见,框架注册了一个名称为nullChannel
.
有关更多信息,请参阅特殊频道。
服务激活器是生成回复消息不需要的组件之一。
如果您的方法返回null
或者有一个void
返回类型时,服务激活器在方法调用后退出,没有任何信号。
此行为可以通过AbstractReplyProducingMessageHandler.requiresReply
选项,也公开为requires-reply
使用 XML 命名空间进行配置时。
如果标志设置为true
并且该方法返回 null,则ReplyRequiredException
被抛出。
服务方法中的参数可以是消息或任意类型。
如果是后者,则假定它是消息有效负载,从消息中提取并注入到服务方法中。
我们通常推荐这种方法,因为它在使用 Spring Integration 时遵循并推广了 POJO 模型。
参数也可能有@Header
或@Headers
注释,如注释支持中所述。
服务方法不需要有任何参数,这意味着您可以实现事件样式的服务激活器(您只关心对服务方法的调用),而不必担心消息的内容。 将其视为空 JMS 消息。 这种实现的一个示例用例是输入通道上存放的消息的简单计数器或监视器。 |
从 4.1 版开始,框架正确转换消息属性 (payload
和headers
) 到 Java 8Optional
POJO 方法参数,如以下示例所示:
public class MyBean {
public String computeValue(Optional<String> payload,
@Header(value="foo", required=false) String foo1,
@Header(value="foo") Optional<String> foo2) {
if (payload.isPresent()) {
String value = payload.get();
...
}
else {
...
}
}
}
我们通常建议使用ref
属性,如果自定义服务激活器处理程序实现可以在其他<service-activator>
定义。
但是,如果自定义服务激活器处理程序实现仅在<service-activator>
,您可以提供内部 Bean 定义,如以下示例所示:
<int:service-activator id="exampleServiceActivator" input-channel="inChannel"
output-channel = "outChannel" method="someMethod">
<beans:bean class="org.something.ExampleServiceActivator"/>
</int:service-activator>
同时使用ref 属性和内部处理程序定义<service-activator> 不允许配置,因为它会创建不明确的条件并导致抛出异常。 |
如果ref 属性引用扩展的 beanAbstractMessageProducingHandler (例如框架本身提供的处理程序),通过将输出通道直接注入处理程序来优化配置。
在这种情况下,每个ref 必须是单独的 bean 实例(或prototype -scoped bean)或使用<bean/> 配置类型。
如果您无意中从多个 Bean 引用了相同的消息处理程序,则会收到配置异常。 |
服务激活器和 Spring 表达式语言 (SpEL)
从 Spring Integration 2.0 开始,服务激活器也可以从 SpEL 中受益。
例如,您可以调用任何 bean 方法,而无需指向ref
属性或将其作为内部 Bean 定义包含在内,如下所示:
<int:service-activator input-channel="in" output-channel="out"
expression="@accountService.processAccount(payload, headers.accountId)"/>
<bean id="accountService" class="thing1.thing2.Account"/>
在前面的配置中,而不是使用ref
或者作为内部 bean,我们使用 SpEL 的@beanId
表示法,并调用一个方法,该方法采用与消息有效负载兼容的类型。
我们还传递一个标头值。
任何有效的 SpEL 表达式都可以根据消息中的任何内容进行评估。
对于简单的场景,如果所有逻辑都可以封装在这样的表达式中,则服务激活器不需要引用 bean,如以下示例所示:
<int:service-activator input-channel="in" output-channel="out" expression="payload * 2"/>
在前面的配置中,我们的服务逻辑是将有效负载值乘以 2。 SpEL 让我们相对容易地处理它。
看服务激活器和.handle()
方法有关配置服务激活器的详细信息,请参阅 Java DSL 章节。
异步服务激活器
服务激活器由调用线程调用。
如果输入通道是SubscribableChannel
或轮询器线程PollableChannel
.
如果服务返回CompletableFuture<?>
,默认作是将其作为发送到输出(或回复)通道的消息的有效负载发送。
从 4.3 版开始,您现在可以将async
属性设置为true
(通过使用setAsync(true)
使用 Java 配置时)。
如果服务返回CompletableFuture<?>
当这个async
属性设置为true
,则调用线程将立即释放,并在完成未来的线程(从服务内部)发送回复消息。
这对于使用PollableChannel
,因为轮询器线程被释放以在框架内执行其他服务。
如果服务以Exception
,则发生正常错误处理。
一ErrorMessage
被发送到errorChannel
消息标头(如果存在)。
否则,一个ErrorMessage
被发送到 defaulterrorChannel
(如果有)。
从 6.1 版开始,如果AbstractMessageProducingHandler
配置为ReactiveStreamsSubscribableChannel
,则默认情况下异步模式处于打开状态。
如果处理程序结果不是响应式类型或CompletableFuture<?>
,则无论输出通道类型如何,都会发生常规回复生成过程。
另请参阅响应式流支持以了解更多信息。
服务激活器和方法返回类型
服务方法可以返回任何成为回复消息有效负载的类型。
在本例中,新的Message<?>
创建对象,并复制请求消息中的所有标头。
对于大多数 Spring Integration,这的工作方式相同MessageHandler
实现,当交互基于 POJO 方法调用时。
一个完整的Message<?>
对象也可以从该方法返回。
但是,请记住,与转换器不同,对于服务激活器,如果返回的消息中还不存在标头,则将通过从请求消息中复制标头来修改此消息。
因此,如果您的方法参数是Message<?>
在服务方法中复制一些(但不是全部)现有标头,它们将重新出现在回复消息中。从回复消息中删除标头不是服务激活器的责任,并且遵循松散耦合原则,最好添加一个HeaderFilter
在集成流程中。或者,可以使用 Transformer 代替 Service Activator,但在这种情况下,当返回完整的Message<?>
该方法完全负责消息,包括复制请求消息头(如果需要)。您必须确保重要的框架头(例如replyChannel
,errorChannel
),如果存在,则必须保留。
剥离
中间器是一个简单的端点,它允许消息流延迟一定的时间间隔。当消息延迟时,原始发送方不会阻塞。相反,延迟消息使用org.springframework.scheduling.TaskScheduler
在延迟过去后发送到输出通道。这种方法即使在相当长的延迟下也是可扩展的,因为它不会导致大量阻塞的发送方线程。相反,在典型情况下,线程池用于实际执行释放消息。本节包含配置 delayer 的几个示例。
配置 Delayer
这<delayer>
元素用于延迟两个消息通道之间的消息流。与其他端点一样,您可以提供 'input-channel' 和 'output-channel' 属性,但 delayer 还具有 'default-delay' 和 'expression' 属性(以及 'expression' 元素),用于确定每条消息应延迟的毫秒数。以下示例将所有消息延迟三秒:
<int:delayer id="delayer" input-channel="input"
default-delay="3000" output-channel="output"/>
如果需要确定每条消息的延迟,还可以使用 'expression' 属性提供 SpEL 表达式,如以下表达式所示:
@Bean
public IntegrationFlow flow() {
return IntegrationFlow.from("input")
.delay("delayer.messageGroupId", d -> d
.defaultDelay(3_000L)
.delayExpression("headers['delay']"))
.channel("output")
.get();
}
@Bean
fun flow() =
integrationFlow("input") {
delay("delayer.messageGroupId") {
defaultDelay(3000L)
delayExpression("headers['delay']")
}
channel("output")
}
@ServiceActivator(inputChannel = "input")
@Bean
public DelayHandler delayer() {
DelayHandler handler = new DelayHandler("delayer.messageGroupId");
handler.setDefaultDelay(3_000L);
handler.setDelayExpressionString("headers['delay']");
handler.setOutputChannelName("output");
return handler;
}
<int:delayer id="delayer" input-channel="input" output-channel="output"
default-delay="3000" expression="headers['delay']"/>
在前面的示例中,仅当给定入站消息的表达式计算结果为 null 时,才适用三秒延迟。如果只想将延迟应用于具有表达式评估有效结果的消息,则可以使用0
(默认值)。对于延迟为0
(或更少),消息会立即在调用线程上发送。
XML 解析器使用消息组 ID<beanName>.messageGroupId . |
延迟处理程序支持表达式评估结果,这些结果表示以毫秒为单位的间隔(任何Object 谁的toString() 方法生成一个可以解析为一个Long )以及java.util.Date 表示绝对时间的实例。在第一种情况下,毫秒从当前时间开始计数(例如,值5000 将从中间器收到消息之日起至少延迟五秒钟)。使用Date 实例,则消息直到该Date 对象。 等同于非正延迟或过去的日期的值不会导致延迟。相反,它会直接发送到原始发送方线程上的输出通道。如果表达式评估结果不是Date 并且不能解析为Long ,默认延迟(如果有 — 默认值为0 ) 被应用。 |
表达式求值可能会因各种原因(包括无效表达式或其他条件)而引发求值异常。默认情况下,此类异常将被忽略(尽管在 DEBUG 级别记录),并且 delayer 回退到默认延迟(如果有)。您可以通过将ignore-expression-failures 属性。 默认情况下,此属性设置为true 并且 delayer 行为如前所述。但是,如果您不希望忽略表达式评估异常并将它们抛给 delayer的调用者,请将ignore-expression-failures 属性设置为false . |
在前面的示例中,延迟表达式指定为
因此,如果有可能省略标头,并且想要回退到默认延迟,则通常使用索引器语法而不是点属性访问器语法更有效(并建议使用),因为检测 null 比捕获异常更快。 |
delayer 委托给 Spring 的TaskScheduler
抽象化。
delayer 使用的默认调度程序是ThreadPoolTaskScheduler
启动时由 Spring Integration 提供的实例。
请参阅配置任务计划程序。
如果要委托给不同的调度器,可以通过 delayer 元素的 'scheduler' 属性提供引用,如以下示例所示:
<int:delayer id="delayer" input-channel="input" output-channel="output"
expression="headers.delay"
scheduler="exampleTaskScheduler"/>
<task:scheduler id="exampleTaskScheduler" pool-size="3"/>
如果将外部ThreadPoolTaskScheduler ,您可以设置waitForTasksToCompleteOnShutdown = true 在这个财产上。
它允许在应用程序关闭时成功完成已经处于执行状态(释放消息)的“延迟”任务。
在 Spring Integration 2.2 之前,此属性在<delayer> 元素,因为DelayHandler 可以在后台创建自己的调度程序。
从 2.2 开始,delayer 需要一个外部调度器实例,并且waitForTasksToCompleteOnShutdown 被删除了。
您应该使用调度程序自己的配置。 |
ThreadPoolTaskScheduler 有一个属性errorHandler ,可以注入一些org.springframework.util.ErrorHandler .
此处理程序允许处理Exception 从发送延迟消息的计划任务的线程。
默认情况下,它使用org.springframework.scheduling.support.TaskUtils$LoggingErrorHandler ,您可以在日志中看到堆栈跟踪。
您可能需要考虑使用org.springframework.integration.channel.MessagePublishingErrorHandler ,它发送一个ErrorMessage 变成一个error-channel ,从失败邮件的标头或默认的error-channel .
此错误处理在事务回滚(如果存在)后执行。
请参阅发布失败。 |
Delayer 和消息存储
这DelayHandler
将延迟消息保存到提供的消息组中MessageStore
.
(“groupId”基于<delayer>
元素。
延迟消息将从MessageStore
在DelayHandler
将消息发送到output-channel
.
如果提供的MessageStore
是持久的(例如JdbcMessageStore
),它提供了在应用程序关闭时不会丢失消息的能力。
应用程序启动后,DelayHandler
从其消息组中读取消息MessageStore
并根据消息的原始到达时间(如果延迟为数字)以延迟重新安排它们。
对于延迟标头为Date
那Date
在重新安排时使用。
如果延迟消息保留在MessageStore
除了它的“延迟”之外,它还在启动后立即发送。
这<delayer>
可以使用两个互斥元素之一进行丰富:<transactional>
和<advice-chain>
. 这List
这些 AOP 建议应用于代理的内部DelayHandler.ReleaseMessageHandler
,它负责在延迟后在Thread
计划任务的。
例如,当下游消息流抛出异常并且ReleaseMessageHandler
回滚。
在这种情况下,延迟的消息将保留在持久性MessageStore
.
您可以使用任何自定义org.aopalliance.aop.Advice
在<advice-chain>
. 这<transactional>
元素定义了一个简单的通知链,该链只有事务性通知。
以下示例显示了advice-chain
在<delayer>
:
<int:delayer id="delayer" input-channel="input" output-channel="output"
expression="headers.delay"
message-store="jdbcMessageStore">
<int:advice-chain>
<beans:ref bean="customAdviceBean"/>
<tx:advice>
<tx:attributes>
<tx:method name="*" read-only="true"/>
</tx:attributes>
</tx:advice>
</int:advice-chain>
</int:delayer>
这DelayHandler
可以导出为 JMXMBean
使用托管作 (getDelayedMessageCount
和reschedulePersistedMessages
),它允许在运行时重新安排延迟持久化消息——例如,如果TaskScheduler
之前已被停止。
这些作可以通过Control Bus
命令,如以下示例所示:
Message<String> delayerReschedulingMessage =
MessageBuilder.withPayload("@'delayer.handler'.reschedulePersistedMessages()").build();
controlBusChannel.send(delayerReschedulingMessage);
有关消息存储、JMX 和控制总线的更多信息,请参阅 系统管理。 |
从版本 5.3.7 开始,如果当消息存储到MessageStore
,则发布任务计划在TransactionSynchronization.afterCommit()
回调。 这是防止竞争条件所必需的,在竞争条件下,计划的释放可能会在事务提交之前运行,并且找不到消息。在这种情况下,消息将在延迟之后或事务提交之后释放,以较晚者为准。
发布失败
从 5.0.8 版本开始,delayer 上有两个新属性:
-
maxAttempts
(默认 5) -
retryDelay
(默认 1 秒)
发布消息时,如果下游流失败,则将在retryDelay
.
如果maxAttempts
到达时,消息将被丢弃(除非发布是事务性的,在这种情况下,消息将保留在存储中,但将不再计划发布,直到应用程序重新启动,或者reschedulePersistedMessages()
方法被调用,如上所述)。
此外,您可以配置delayedMessageErrorChannel
; 当发布失败时,一个ErrorMessage
被发送到该通道,例外作为有效负载,并且具有originalMessage
财产。 这ErrorMessage
包含标头IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT
包含当前计数。
如果错误流使用错误消息并正常退出,则不会采取进一步的作;如果发布是事务性的,则事务将提交并从存储中删除消息。如果错误流引发异常,则将重试发布,直到maxAttempts
如上所述。
脚本支持
Spring Integration 2.1 添加了对 Java 版本 6 中引入的 JSR223 Scripting for Java 规范的支持。它允许您使用以任何受支持的语言(包括 Ruby、JRuby、Groovy 和 Kotlin)编写的脚本为各种集成组件提供逻辑,类似于在 Spring Integration 中使用 Spring 表达式语言 (SpEL) 的方式。有关 JSR223 的更多信息,请参阅文档。
您需要将此依赖项包含在您的项目中:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-scripting</artifactId>
<version>6.1.9</version>
</dependency>
compile "org.springframework.integration:spring-integration-scripting:6.1.9"
此外,还需要添加一个脚本引擎实现,例如 JRuby、Jython。
从 5.2 版开始,Spring Integration 提供了 Kotlin Jsr223 支持。您需要将此依赖项添加到您的项目中才能使其正常工作:
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-scripting-jsr223</artifactId>
<scope>runtime</scope>
</dependency>
runtime 'org.jetbrains.kotlin:kotlin-scripting-jsr223'
第三方已经开发了各种 JSR223 语言实现。特定实现与 Spring Integration 的兼容性取决于它符合规范的程度以及实现者对规范的解释。 |
如果您计划使用 Groovy 作为脚本语言,我们建议您使用 Spring-Integration 的 Groovy 支持,因为它提供了特定于 Groovy 的附加功能。但是,本节也是相关的。 |
脚本配置
根据集成需求的复杂性,脚本可以在 XML 配置中作为 CDATA 内联提供,或者作为对包含脚本的 Spring 资源的引用。为了启用脚本支持,Spring Integration 定义了一个ScriptExecutingMessageProcessor
,它将消息有效负载绑定到名为payload
并将消息头设置为headers
变量,两者都可以在脚本执行上下文中访问。
您需要做的就是编写一个使用这些变量的脚本。
以下一对示例显示了创建筛选器的示例配置:
@Bean
public IntegrationFlow scriptFilter() {
return f -> f.filter(Scripts.processor("some/path/to/ruby/script/RubyFilterTests.rb"));
}
...
@Bean
public Resource scriptResource() {
return new ByteArrayResource("headers.type == 'good'".getBytes());
}
@Bean
public IntegrationFlow scriptFilter() {
return f -> f.filter(Scripts.processor(scriptResource()).lang("groovy"));
}
<int:filter input-channel="referencedScriptInput">
<int-script:script location="some/path/to/ruby/script/RubyFilterTests.rb"/>
</int:filter>
<int:filter input-channel="inlineScriptInput">
<int-script:script lang="groovy">
<![CDATA[
return payload == 'good'
]]>
</int-script:script>
</int:filter>
如前面的示例所示,脚本可以内联包含在内,也可以通过引用资源位置来包含(通过使用location
属性)。
此外,lang
属性对应于语言名称(或其 JSR223 别名)。
其他支持脚本的 Spring Integration 端点元素包括router
,service-activator
,transformer
和splitter
.
每种情况下的脚本配置都与上述相同(除了端点元素)。
脚本支持的另一个有用功能是能够更新(重新加载)脚本,而无需重新启动应用程序上下文。
为此,请指定refresh-check-delay
属性script
元素,如以下示例所示:
Scripts.processor(...).refreshCheckDelay(5000)
}
<int-script:script location="..." refresh-check-delay="5000"/>
在前面的示例中,每 5 秒检查一次脚本位置的更新。 如果更新了脚本,则在更新后 5 秒内发生的任何调用都会导致运行新脚本。
请考虑以下示例:
Scripts.processor(...).refreshCheckDelay(0)
}
<int-script:script location="..." refresh-check-delay="0"/>
在前面的示例中,一旦发生任何脚本修改,上下文就会使用任何脚本修改进行更新,从而为“实时”配置提供简单的机制。 任何负值都表示在初始化应用程序上下文后不会重新加载脚本。 这是默认行为。 以下示例显示了一个永不更新的脚本:
Scripts.processor(...).refreshCheckDelay(-1)
}
<int-script:script location="..." refresh-check-delay="-1"/>
无法重新加载内联脚本。 |
脚本变量绑定
需要变量绑定才能使脚本引用外部提供给脚本执行上下文的变量。
默认情况下,payload
和headers
用作绑定变量。
您可以使用以下命令将其他变量绑定到脚本<variable>
元素(或ScriptSpec.variables()
选项),如以下示例所示:
Scripts.processor("foo/bar/MyScript.py")
.variables(Map.of("var1", "thing1", "var2", "thing2", "date", date))
}
<script:script lang="py" location="foo/bar/MyScript.py">
<script:variable name="var1" value="thing1"/>
<script:variable name="var2" value="thing2"/>
<script:variable name="date" ref="date"/>
</script:script>
如前面的示例所示,您可以将脚本变量绑定到标量值或 Spring bean 引用。
请注意payload
和headers
仍作为绑定变量包含在内。
在 Spring Integration 3.0 中,除了variable
元素,则variables
属性。
此属性和variable
元素并不相互排斥,您可以将它们组合到一个元素中script
元件。
但是,变量必须是唯一的,无论它们在何处定义。
此外,从 Spring Integration 3.0 开始,内联脚本也允许变量绑定,如以下示例所示:
<service-activator input-channel="input">
<script:script lang="ruby" variables="thing1=THING1, date-ref=dateBean">
<script:variable name="thing2" ref="thing2Bean"/>
<script:variable name="thing3" value="thing2"/>
<![CDATA[
payload.foo = thing1
payload.date = date
payload.bar = thing2
payload.baz = thing3
payload
]]>
</script:script>
</service-activator>
前面的示例显示了内联脚本的组合,即variable
元素和variables
属性。
这variables
属性包含一个逗号分隔的值,其中每个段包含变量及其值的“=”分隔对。
变量名称可以后缀-ref
,如date-ref
变量。
这意味着绑定变量的名称为date
,但该值是对dateBean
bean 来自应用程序上下文。
这在使用属性占位符配置或命令行参数时可能很有用。
如果您需要更好地控制变量的生成方式,您可以实现自己的 Java 类,该类使用ScriptVariableGenerator
策略,由以下接口定义:
public interface ScriptVariableGenerator {
Map<String, Object> generateScriptVariables(Message<?> message);
}
此接口要求您实现generateScriptVariables(Message)
方法。
message 参数允许您访问消息有效负载和标头中可用的任何数据,返回值为Map
的绑定变量。
每次为消息执行脚本时都会调用此方法。
以下示例演示如何提供ScriptVariableGenerator
并使用script-variable-generator
属性:
Scripts.processor("foo/bar/MyScript.groovy")
.variableGenerator(new foo.bar.MyScriptVariableGenerator())
}
<int-script:script location="foo/bar/MyScript.groovy"
script-variable-generator="variableGenerator"/>
<bean id="variableGenerator" class="foo.bar.MyScriptVariableGenerator"/>
如果script-variable-generator
,脚本组件使用DefaultScriptVariableGenerator
,它合并了任何提供的<variable>
元素与payload
和headers
变量Message
在其generateScriptVariables(Message)
方法。
您不能同时提供script-variable-generator 属性和<variable> 元素。
它们是相互排斥的。 |
GraalVM 多语言
从 6.0 版开始,该框架提供了一个PolyglotScriptExecutor
它基于 GraalVM Polyglot API。
JavaScript 的 JSR223 引擎实现本身从 Java 中删除,已被使用这个新的脚本执行器所取代。
请参阅有关在 GraalVM 中启用 JavaScript 支持以及可以通过脚本变量传播哪些配置选项的更多信息。
默认情况下,框架将allowAllAccess
自true
在共享的多语言上Context
它启用了与主机 JVM 的交互:
-
新线程的创建和使用。
-
对公共主机类的访问权限。
-
通过向类路径添加条目来加载新的主机类。
-
将新成员导出到多语言绑定中。
-
主机系统上的无限制 IO作。
-
通过实验选项。
-
新子流程的创建和使用。
-
对进程环境变量的访问。
这可以通过过载PolyglotScriptExecutor
构造函数,该构造函数接受org.graalvm.polyglot.Context.Builder
.
要启用此 JavaScript 支持,请将 GraalVM 与js
必须使用已安装的组件,或者,当使用常规 JVM 时,必须使用org.graalvm.sdk:graal-sdk
和org.graalvm.js:js
必须包含依赖项。
Groovy 支持
在 Spring Integration 2.0 中,我们添加了 Groovy 支持,允许您使用 Groovy 脚本语言为各种集成组件提供逻辑——类似于支持 Spring 表达式语言 (SpEL) 进行路由、转换和其他集成问题的方式。 有关 Groovy 的更多信息,请参阅 Groovy 文档,您可以在项目网站上找到该文档。
您需要将此依赖项包含在您的项目中:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-groovy</artifactId>
<version>6.1.9</version>
</dependency>
compile "org.springframework.integration:spring-integration-groovy:6.1.9"
此外,从 6.0 版开始,提供了用于集成流配置的 Groovy DSL。
Groovy 配置
在 Spring Integration 2.1 中,Groovy 支持的配置命名空间是 Spring Integration 脚本支持的扩展,并共享脚本支持部分中详细描述的核心配置和行为。尽管通用脚本支持很好地支持了 Groovy 脚本,但 Groovy 支持提供了Groovy
配置命名空间,该命名空间由 Spring Framework 的org.springframework.scripting.groovy.GroovyScriptFactory
和相关组件,为使用 Groovy 提供了扩展功能。以下列表显示了两个示例配置:
<int:filter input-channel="referencedScriptInput">
<int-groovy:script location="some/path/to/groovy/file/GroovyFilterTests.groovy"/>
</int:filter>
<int:filter input-channel="inlineScriptInput">
<int-groovy:script><![CDATA[
return payload == 'good'
]]></int-groovy:script>
</int:filter>
如前面的示例所示,该配置看起来与常规脚本支持配置相同。唯一的区别是使用 Groovy 命名空间,如int-groovy
命名空间前缀。另请注意,lang
属性<script>
标记在此命名空间中无效。
Groovy 对象定制
如果您需要自定义 Groovy 对象本身(除了设置变量之外),您可以引用实现GroovyObjectCustomizer
通过使用customizer
属性。 例如,如果您想通过修改MetaClass
以及注册脚本中可用的函数。以下示例显示了如何执行此作:
<int:service-activator input-channel="groovyChannel">
<int-groovy:script location="somewhere/SomeScript.groovy" customizer="groovyCustomizer"/>
</int:service-activator>
<beans:bean id="groovyCustomizer" class="org.something.MyGroovyObjectCustomizer"/>
设置自定义GroovyObjectCustomizer
不互斥<variable>
元素或script-variable-generator
属性。 也可以在定义内联脚本时提供它。
Spring Integration 3.0 引入了variables
属性,它与variable
元素。 此外,groovy 脚本能够将变量解析为BeanFactory
,如果未提供绑定变量的名称。以下示例演示如何使用变量 (entityManager
):
<int-groovy:script>
<![CDATA[
entityManager.persist(payload)
payload
]]>
</int-groovy:script>
entityManager
必须是应用程序上下文中的适当 bean。
有关<variable>
元素,则variables
属性,而script-variable-generator
属性,请参阅脚本变量绑定。
Groovy 脚本编译器自定义
这@CompileStatic
hint 是最流行的 Groovy 编译器自定义选项。
它可以在类或方法级别使用。
有关更多信息,请参阅 Groovy 参考手册,特别是 @CompileStatic。
为了将此功能用于短脚本(在集成场景中),我们被迫将简单的脚本更改为更像 Java 的代码。
考虑以下几点<filter>
脚本:
headers.type == 'good'
前面的脚本在 Spring Integration 中变成了以下方法:
@groovy.transform.CompileStatic
String filter(Map headers) {
headers.type == 'good'
}
filter(headers)
这样,filter()
方法被转换并编译为静态 Java 代码,绕过 Groovy
调用的动态阶段,例如getProperty()
工厂和CallSite
代理。
从版本 4.3 开始,您可以使用compile-static
boolean
选项,指定ASTTransformationCustomizer
为@CompileStatic
应该添加到内部CompilerConfiguration
.
有了这个,你可以省略使用@CompileStatic
,并且仍然获得编译的纯 Java 代码。
在这种情况下,前面的脚本可以很短,但仍需要比解释脚本更详细一些,如以下示例所示:
binding.variables.headers.type == 'good'
您必须访问headers
和payload
(或任何其他)变量groovy.lang.Script
binding
属性,因为,使用@CompileStatic
,我们没有动态GroovyObject.getProperty()
能力。
此外,我们还引入了compiler-configuration
bean 引用。
使用此属性,您可以提供任何其他必需的 Groovy 编译器自定义,例如ImportCustomizer
.
有关此功能的更多信息,请参阅高级编译器配置的 Groovy 文档。
用compilerConfiguration 不会自动添加ASTTransformationCustomizer 对于@CompileStatic 注释,它覆盖了compileStatic 选择。
如果您仍然需要CompileStatic ,您应该手动添加一个new ASTTransformationCustomizer(CompileStatic.class) 进入CompilationCustomizers 那个习俗compilerConfiguration . |
Groovy 编译器自定义对refresh-check-delay 选项,并且也可以静态编译可重新加载的脚本。 |
控制总线
如 (Enterprise Integration Patterns) 中所述,控制总线背后的思想是,您可以使用与用于“应用程序级”消息传递相同的消息传递系统来监视和管理框架内的组件。 在 Spring Integration 中,我们基于前面描述的适配器进行构建,以便您可以发送消息作为调用公开作的一种方式。 这些作的一个选项是 Groovy 脚本。 以下示例为控制总线配置 Groovy 脚本:
<int-groovy:control-bus input-channel="operationChannel"/>
控制总线有一个输入通道,可以访问该通道以调用应用程序上下文中对 Bean 的作。
Groovy 控制总线在输入通道上作为 Groovy 脚本运行消息。
它接受一条消息,将正文编译为脚本,并使用GroovyObjectCustomizer
,并运行它。
控制总线'MessageProcessor
在应用程序上下文中公开所有被@ManagedResource
并实现 Spring 的Lifecycle
接口或扩展 Spring 的CustomizableThreadCreator
基类(例如,多个TaskExecutor
和TaskScheduler
实现)。
在 Control Bus' 命令脚本中使用具有自定义作用域(例如 'request')的托管 Bean时要小心,尤其是在异步消息流中。
如果MessageProcessor 的控制总线无法从应用程序上下文中公开 bean,您最终可能会遇到一些BeansException 在命令脚本运行期间。
例如,如果未建立自定义作用域的上下文,则尝试获取该作用域内的 Bean 会触发BeanCreationException . |
如果您需要进一步自定义 Groovy 对象,您还可以提供对实现GroovyObjectCustomizer
通过customizer
属性,如以下示例所示:
<int-groovy:control-bus input-channel="input"
output-channel="output"
customizer="groovyCustomizer"/>
<beans:bean id="groovyCustomizer" class="org.foo.MyGroovyObjectCustomizer"/>
向端点添加行为
在 Spring Integration 2.2 之前,您可以通过将 AOP Advice 添加到轮询器的<advice-chain/>
元素。
但是,假设您只想重试 REST Web 服务调用,而不是任何下游端点。
例如,考虑以程:
inbound-adapter->poller->http-gateway1->http-gateway2->jdbc-outbound-adapter
如果您在轮询器上的通知链中配置了一些重试逻辑,并且调用http-gateway2
由于网络故障而失败,重试会导致http-gateway1
和http-gateway2
第二次被叫。
同样,在 jdbc-outbound-adapter 中发生暂时性故障后,在再次调用jdbc-outbound-adapter
.
Spring Integration 2.2 添加了向单个端点添加行为的功能。
这是通过添加<request-handler-advice-chain/>
元素到许多端点。
以下示例演示了如何将<request-handler-advice-chain/>
元素outbound-gateway
:
<int-http:outbound-gateway id="withAdvice"
url-expression="'http://localhost/test1'"
request-channel="requests"
reply-channel="nextChannel">
<int-http:request-handler-advice-chain>
<ref bean="myRetryAdvice" />
</int-http:request-handler-advice-chain>
</int-http:outbound-gateway>
在这种情况下,myRetryAdvice
仅在本地应用于此网关,不适用于在将回复发送到nextChannel
.
建议的范围仅限于端点本身。
目前,您无法建议整个 但是,一个 |
提供的咨询课程
除了提供应用AOP通知类的通用机制外,Spring Integration还提供了以下开箱即用的通知实现:
-
RequestHandlerRetryAdvice
(在重试建议中描述) -
RequestHandlerCircuitBreakerAdvice
(在断路器建议中描述) -
ExpressionEvaluatingRequestHandlerAdvice
(在表达式评估建议中描述) -
RateLimiterRequestHandlerAdvice
(在速率限制器建议中描述) -
CacheRequestHandlerAdvice
(在缓存建议中描述) -
ReactiveRequestHandlerAdvice
(在反应性建议中描述) -
ContextHolderRequestHandlerAdvice
(在上下文持有人建议中描述)
重试建议
重试建议 (o.s.i.handler.advice.RequestHandlerRetryAdvice
)利用了 Spring Retry 项目提供的丰富重试机制。的核心组件spring-retry
是RetryTemplate
,它允许配置复杂的重试方案,包括RetryPolicy
和BackoffPolicy
策略(具有许多实现)以及RecoveryCallback
策略,以确定重试用尽时要采取的作。
- 无状态重试
-
无状态重试是指重试活动完全在通知中处理的情况。线程暂停(如果配置为这样做)并重试作。
- 有状态重试
-
有状态重试是指重试状态在通知中进行管理,但抛出异常并且调用方重新提交请求的情况。有状态重试的一个示例是,当我们希望消息发起者(例如,JMS)负责重新提交,而不是在当前线程上执行它时。有状态重试需要某种机制来检测重试的提交。
有关spring-retry
,请参阅项目的 Javadoc 和 Spring Batch 的参考文档,其中spring-retry
起源。
默认的退后行为是不退后退。 立即尝试重试。 使用导致线程在两次尝试之间暂停的回退策略可能会导致性能问题,包括内存使用过多和线程匮乏。 在高容量环境中,应谨慎使用退避策略。 |
配置重试建议
本节中的示例使用以下内容<service-activator>
总是抛出异常:
public class FailingService {
public void service(String message) {
throw new RuntimeException("error");
}
}
- 简单无状态重试
-
默认值
RetryTemplate
有一个SimpleRetryPolicy
尝试三次。 没有BackOffPolicy
,因此三次尝试是背靠背进行的,两次尝试之间没有延迟。 没有RecoveryCallback
,因此结果是在最后一次失败的重试发生后向调用方抛出异常。 在 Spring Integration 环境中,可以使用error-channel
在入站终结点上。 以下示例使用RetryTemplate
并显示其DEBUG
输出:<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"/> </int:request-handler-advice-chain> </int:service-activator> DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...] DEBUG [task-scheduler-2]Retry: count=0 DEBUG [task-scheduler-2]Checking for rethrow: count=1 DEBUG [task-scheduler-2]Retry: count=1 DEBUG [task-scheduler-2]Checking for rethrow: count=2 DEBUG [task-scheduler-2]Retry: count=2 DEBUG [task-scheduler-2]Checking for rethrow: count=3 DEBUG [task-scheduler-2]Retry failed last attempt: count=3
- 带恢复的简单无状态重试
-
以下示例将
RecoveryCallback
添加到前面的示例中,并使用ErrorMessageSendingRecoverer
发送ErrorMessage
到频道:<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"> <property name="recoveryCallback"> <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer"> <constructor-arg ref="myErrorChannel" /> </bean> </property> </bean> </int:request-handler-advice-chain> </int:service-activator> DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...] DEBUG [task-scheduler-2]Retry: count=0 DEBUG [task-scheduler-2]Checking for rethrow: count=1 DEBUG [task-scheduler-2]Retry: count=1 DEBUG [task-scheduler-2]Checking for rethrow: count=2 DEBUG [task-scheduler-2]Retry: count=2 DEBUG [task-scheduler-2]Checking for rethrow: count=3 DEBUG [task-scheduler-2]Retry failed last attempt: count=3 DEBUG [task-scheduler-2]Sending ErrorMessage :failedMessage:[Payload=...]
- 使用自定义策略的无状态重试和恢复
-
为了更复杂,我们可以提供定制的建议
RetryTemplate
. 此示例继续使用SimpleRetryPolicy
但将尝试次数增加到四次。 它还添加了一个ExponentialBackoffPolicy
其中第一次重试等待一秒,第二次等待五秒,第三次等待 25 秒(总共四次尝试)。 以下列表显示了该示例及其DEBUG
输出:<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"> <property name="recoveryCallback"> <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer"> <constructor-arg ref="myErrorChannel" /> </bean> </property> <property name="retryTemplate" ref="retryTemplate" /> </bean> </int:request-handler-advice-chain> </int:service-activator> <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate"> <property name="retryPolicy"> <bean class="org.springframework.retry.policy.SimpleRetryPolicy"> <property name="maxAttempts" value="4" /> </bean> </property> <property name="backOffPolicy"> <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy"> <property name="initialInterval" value="1000" /> <property name="multiplier" value="5.0" /> <property name="maxInterval" value="60000" /> </bean> </property> </bean> 27.058 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...] 27.071 DEBUG [task-scheduler-1]Retry: count=0 27.080 DEBUG [task-scheduler-1]Sleeping for 1000 28.081 DEBUG [task-scheduler-1]Checking for rethrow: count=1 28.081 DEBUG [task-scheduler-1]Retry: count=1 28.081 DEBUG [task-scheduler-1]Sleeping for 5000 33.082 DEBUG [task-scheduler-1]Checking for rethrow: count=2 33.082 DEBUG [task-scheduler-1]Retry: count=2 33.083 DEBUG [task-scheduler-1]Sleeping for 25000 58.083 DEBUG [task-scheduler-1]Checking for rethrow: count=3 58.083 DEBUG [task-scheduler-1]Retry: count=3 58.084 DEBUG [task-scheduler-1]Checking for rethrow: count=4 58.084 DEBUG [task-scheduler-1]Retry failed last attempt: count=4 58.086 DEBUG [task-scheduler-1]Sending ErrorMessage :failedMessage:[Payload=...]
- 命名空间对无状态重试的支持
-
从 4.0 版开始,由于命名空间对重试建议的支持,可以大大简化上述配置,如以下示例所示:
<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <ref bean="retrier" /> </int:request-handler-advice-chain> </int:service-activator> <int:handler-retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel"> <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" /> </int:handler-retry-advice>
在前面的示例中,建议被定义为顶级 Bean,以便它可以在多个
request-handler-advice-chain
实例。 您还可以直接在链中定义通知,如以下示例所示:<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <int:retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel"> <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" /> </int:retry-advice> </int:request-handler-advice-chain> </int:service-activator>
一个
<handler-retry-advice>
可以有一个<fixed-back-off>
或<exponential-back-off>
child 元素或没有子元素。 一个<handler-retry-advice>
with no 子元素使用 no back off。 如果没有recovery-channel
,则在重试用尽时引发异常。 命名空间只能用于无状态重试。对于更复杂的环境(自定义策略等),请使用
<bean>
定义。 - 带恢复的简单有状态重试
-
要使重试有状态,我们需要为建议提供
RetryStateGenerator
实现。 此类用于将消息标识为重新提交,以便RetryTemplate
可以确定此消息的重试当前状态。 该框架提供了一个SpelExpressionRetryStateGenerator
,它使用 SpEL 表达式确定消息标识符。 此示例再次使用默认策略(三次尝试,无回退)。 与无状态重试一样,可以自定义这些策略。 以下列表显示了该示例及其DEBUG
输出:<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"> <property name="retryStateGenerator"> <bean class="o.s.i.handler.advice.SpelExpressionRetryStateGenerator"> <constructor-arg value="headers['jms_messageId']" /> </bean> </property> <property name="recoveryCallback"> <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer"> <constructor-arg ref="myErrorChannel" /> </bean> </property> </bean> </int:request-handler-advice-chain> </int:service-activator> 24.351 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 24.368 DEBUG [Container#0-1]Retry: count=0 24.387 DEBUG [Container#0-1]Checking for rethrow: count=1 24.387 DEBUG [Container#0-1]Rethrow in retry for policy: count=1 24.387 WARN [Container#0-1]failure occurred in gateway sendAndReceive org.springframework.integration.MessagingException: Failed to invoke handler ... Caused by: java.lang.RuntimeException: foo ... 24.391 DEBUG [Container#0-1]Initiating transaction rollback on application exception ... 25.412 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 25.412 DEBUG [Container#0-1]Retry: count=1 25.413 DEBUG [Container#0-1]Checking for rethrow: count=2 25.413 DEBUG [Container#0-1]Rethrow in retry for policy: count=2 25.413 WARN [Container#0-1]failure occurred in gateway sendAndReceive org.springframework.integration.MessagingException: Failed to invoke handler ... Caused by: java.lang.RuntimeException: foo ... 25.414 DEBUG [Container#0-1]Initiating transaction rollback on application exception ... 26.418 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 26.418 DEBUG [Container#0-1]Retry: count=2 26.419 DEBUG [Container#0-1]Checking for rethrow: count=3 26.419 DEBUG [Container#0-1]Rethrow in retry for policy: count=3 26.419 WARN [Container#0-1]failure occurred in gateway sendAndReceive org.springframework.integration.MessagingException: Failed to invoke handler ... Caused by: java.lang.RuntimeException: foo ... 26.420 DEBUG [Container#0-1]Initiating transaction rollback on application exception ... 27.425 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 27.426 DEBUG [Container#0-1]Retry failed last attempt: count=3 27.426 DEBUG [Container#0-1]Sending ErrorMessage :failedMessage:[Payload=...]
如果将前面的示例与无状态示例进行比较,则可以看到,使用有状态重试时,每次失败时都会向调用方引发异常。
- 重试的异常分类
-
Spring Retry 在确定哪些异常可以调用重试方面具有很大的灵活性。默认配置对所有异常进行重试,异常分类器查看顶级异常。如果您将其配置为,例如,仅在
MyException
并且您的应用程序会抛出一个SomeOtherException
其中原因是MyException
,则不会发生重试。从 Spring Retry 1.0.3 开始,
BinaryExceptionClassifier
有一个名为traverseCauses
(默认值为false
). 什么时候true
,它遍历异常原因,直到找到匹配项或遍历原因用完。要使用此分类器进行重试,请使用
SimpleRetryPolicy
使用采用最大尝试次数的构造函数创建,则Map
之Exception
对象,以及traverseCauses
布尔。 然后,您可以将此策略注入RetryTemplate
.
traverseCauses 在这种情况下是必需的,因为用户异常可能包装在MessagingException . |
断路器建议
断路器模式的总体思想是,如果服务当前不可用,请不要浪费时间(和资源)尝试使用它。
这o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice
实现此模式。
当断路器处于关闭状态时,终结点会尝试调用服务。
如果一定次数的连续尝试失败,断路器将进入打开状态。
当它处于打开状态时,新请求会“快速失败”,并且在一段时间到期之前不会尝试调用服务。
当该时间到期时,断路器将设置为半开状态。 当处于这种状态时,即使一次尝试失败,断路器也会立即进入打开状态。 如果尝试成功,断路器将进入关闭状态,在这种情况下,它不会再次进入打开状态,直到再次发生配置的连续故障次数。 任何成功的尝试都会将状态重置为零失败,以确定断路器何时可以再次进入打开状态。
通常,此建议可用于外部服务,在这些服务中,可能需要一些时间才能失败(例如尝试建立网络连接的超时)。
这RequestHandlerCircuitBreakerAdvice
有两个属性:threshold
和halfOpenAfter
. 这threshold
属性表示断路器打开之前需要发生的连续故障数。
它默认为5
. 这halfOpenAfter
属性表示断路器在尝试另一个请求之前等待的最后一次失败后的时间。
默认值为 1000 毫秒。
以下示例配置断路器并显示其DEBUG
和ERROR
输出:
<int:service-activator input-channel="input" ref="failer" method="service">
<int:request-handler-advice-chain>
<bean class="o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice">
<property name="threshold" value="2" />
<property name="halfOpenAfter" value="12000" />
</bean>
</int:request-handler-advice-chain>
</int:service-activator>
05.617 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...]
05.638 ERROR [task-scheduler-1]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
10.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
10.600 ERROR [task-scheduler-2]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
15.598 DEBUG [task-scheduler-3]preSend on channel 'input', message: [Payload=...]
15.599 ERROR [task-scheduler-3]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
20.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
20.598 ERROR [task-scheduler-2]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
25.598 DEBUG [task-scheduler-5]preSend on channel 'input', message: [Payload=...]
25.601 ERROR [task-scheduler-5]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
30.598 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=foo...]
30.599 ERROR [task-scheduler-1]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
在前面的示例中,阈值设置为2
和halfOpenAfter
设置为12
秒。
每 5 秒到达一个新请求。
前两次尝试调用了该服务。
第三个和第四个失败,异常表明断路器已打开。
尝试了第五个请求,因为该请求是在上次失败后 15 秒。
第六次尝试立即失败,因为断路器立即打开。
表达式评估建议
最后提供的建议类是o.s.i.handler.advice.ExpressionEvaluatingRequestHandlerAdvice
.
这个建议比其他两个建议更笼统。
它提供了一种机制来评估发送到终结点的原始入站消息的表达式。
在成功或失败后,可以评估单独的表达式。
可选地,可以将包含评估结果的消息与输入消息一起发送到消息通道。
此建议的典型用例可能是使用<ftp:outbound-channel-adapter/>
,如果传输成功,则可能将文件移动到一个目录,如果传输失败,则将文件移动到另一个目录:
通知具有用于设置成功时表达式、失败表达式以及每个表达式的相应通道的属性。
对于成功案例,发送到successChannel
是一个AdviceMessage
,有效负载是表达式评估的结果。
一个名为inputMessage
,包含发送到处理程序的原始消息。
发送到failureChannel
(当处理程序抛出异常时)是ErrorMessage
有效负载为MessageHandlingExpressionEvaluatingAdviceException
.
像所有人一样MessagingException
实例中,此有效负载具有failedMessage
和cause
属性,以及名为evaluationResult
,其中包含表达式计算的结果。
从 5.1.3 版开始,如果配置了通道,但未提供表达式,则默认表达式用于计算为payload 消息的。 |
在通知的作用域中抛出异常时,默认情况下,该异常会在任何之后抛出给调用者failureExpression
被评估。
如果要禁止抛出异常,请将trapException
属性设置为true
.
以下建议显示如何配置advice
使用 Java DSL:
@SpringBootApplication
public class EerhaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(EerhaApplication.class, args);
MessageChannel in = context.getBean("advised.input", MessageChannel.class);
in.send(new GenericMessage<>("good"));
in.send(new GenericMessage<>("bad"));
context.close();
}
@Bean
public IntegrationFlow advised() {
return f -> f.<String>handle((payload, headers) -> {
if (payload.equals("good")) {
return null;
}
else {
throw new RuntimeException("some failure");
}
}, c -> c.advice(expressionAdvice()));
}
@Bean
public Advice expressionAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setSuccessChannelName("success.input");
advice.setOnSuccessExpressionString("payload + ' was successful'");
advice.setFailureChannelName("failure.input");
advice.setOnFailureExpressionString(
"payload + ' was bad, with reason: ' + #exception.cause.message");
advice.setTrapException(true);
return advice;
}
@Bean
public IntegrationFlow success() {
return f -> f.handle(System.out::println);
}
@Bean
public IntegrationFlow failure() {
return f -> f.handle(System.out::println);
}
}
速率限制器建议
速率限制器建议 (RateLimiterRequestHandlerAdvice
) 允许确保端点不会因请求而过载。
当违反速率限制时,请求将处于阻止状态。
此建议的一个典型用例可能是外部服务提供商不允许超过n
每分钟的请求数。
这RateLimiterRequestHandlerAdvice
实现完全基于 Resilience4j 项目,并且需要RateLimiter
或RateLimiterConfig
注射。
也可以配置默认值和/或自定义名称。
以下示例配置了每 1 秒一个请求的速率限制器通知:
@Bean
public RateLimiterRequestHandlerAdvice rateLimiterRequestHandlerAdvice() {
return new RateLimiterRequestHandlerAdvice(RateLimiterConfig.custom()
.limitRefreshPeriod(Duration.ofSeconds(1))
.limitForPeriod(1)
.build());
}
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "resultChannel",
adviceChain = "rateLimiterRequestHandlerAdvice")
public String handleRequest(String payload) {
...
}
缓存建议
从 5.2 版开始,CacheRequestHandlerAdvice
已被引入。
它基于 Spring Framework 中的缓存抽象,并与@Caching
注释族。
内部逻辑基于CacheAspectSupport
扩展,其中缓存作的代理是围绕AbstractReplyProducingMessageHandler.RequestHandler.handleRequestMessage
方法与请求Message<?>
作为论据。
可以使用 SpEL 表达式或Function
以评估缓存键。
请求Message<?>
可用作 SpEL 评估上下文的根对象,或作为Function
input 参数。
默认情况下,payload
的请求消息用于缓存键。
这CacheRequestHandlerAdvice
必须配置为cacheNames
,当默认缓存作为CacheableOperation
,或使用任何任意的集合CacheOperation
s.
每CacheOperation
可以单独配置或具有共享选项,例如CacheManager
,CacheResolver
和CacheErrorHandler
,可以从CacheRequestHandlerAdvice
配置。
此配置功能类似于 Spring Framework 的@CacheConfig
和@Caching
注释组合。
如果CacheManager
,则默认情况下从BeanFactory
在CacheAspectSupport
.
以下示例配置了两个具有不同缓存作集的通知:
@Bean
public CacheRequestHandlerAdvice cacheAdvice() {
CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice(TEST_CACHE);
cacheRequestHandlerAdvice.setKeyExpressionString("payload");
return cacheRequestHandlerAdvice;
}
@Transformer(inputChannel = "transformerChannel", outputChannel = "nullChannel", adviceChain = "cacheAdvice")
public Object transform(Message<?> message) {
...
}
@Bean
public CacheRequestHandlerAdvice cachePutAndEvictAdvice() {
CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice();
cacheRequestHandlerAdvice.setKeyExpressionString("payload");
CachePutOperation.Builder cachePutBuilder = new CachePutOperation.Builder();
cachePutBuilder.setCacheName(TEST_PUT_CACHE);
CacheEvictOperation.Builder cacheEvictBuilder = new CacheEvictOperation.Builder();
cacheEvictBuilder.setCacheName(TEST_CACHE);
cacheRequestHandlerAdvice.setCacheOperations(cachePutBuilder.build(), cacheEvictBuilder.build());
return cacheRequestHandlerAdvice;
}
@ServiceActivator(inputChannel = "serviceChannel", outputChannel = "nullChannel",
adviceChain = "cachePutAndEvictAdvice")
public Message<?> service(Message<?> message) {
...
}
反应性建议
从 5.3 版开始,一个ReactiveRequestHandlerAdvice
可用于请求消息处理程序,生成Mono
答复。
一个BiFunction<Message<?>, Mono<?>, Publisher<?>>
必须为此通知提供,并且它是从Mono.transform()
操作员对被截获的回复产生的回复handleRequestMessage()
方法实现。
通常,这样的Mono
当我们想通过以下方式控制网络波动时,定制是必要的timeout()
,retry()
以及类似的支持运营商。
例如,当我们可以通过 WebFlux 客户端发出 HTTP 请求时,我们可以使用以下配置来等待响应的时间不超过 5 秒:
.handle(WebFlux.outboundGateway("https://somehost/"),
e -> e.customizeMonoReply((message, mono) -> mono.timeout(Duration.ofSeconds(5))));
这message
argument 是消息处理程序的请求消息,可用于确定请求范围属性。
这mono
参数是此消息处理程序的handleRequestMessage()
方法实现。
嵌套的Mono.transform()
也可以从此函数调用,以应用,例如,无功断路器。
上下文持有者建议
从 6.1 版开始,ContextHolderRequestHandlerAdvice
已被引入。
此通知从请求消息中获取一些值,并将其存储在上下文持有者中。
当在目标上完成执行时,该值从上下文中清晰可见MessageHandler
.
思考此建议的最佳方式类似于编程流程,我们将一些值存储到ThreadLocal
,从目标调用获取对它的访问权限,然后清理ThreadLocal
执行后。
这ContextHolderRequestHandlerAdvice
需要以下构造函数参数:Function<Message<?>, Object>
作为价值提供者,Consumer<Object>
作为上下文集回调和Runnable
作为上下文清理钩子。
下面是一个示例,如何ContextHolderRequestHandlerAdvice
可与o.s.i.file.remote.session.DelegatingSessionFactory
:
@Bean
DelegatingSessionFactory<?> dsf(SessionFactory<?> one, SessionFactory<?> two) {
return new DelegatingSessionFactory<>(Map.of("one", one, "two", two), null);
}
@Bean
ContextHolderRequestHandlerAdvice contextHolderRequestHandlerAdvice(DelegatingSessionFactory<String> dsf) {
return new ContextHolderRequestHandlerAdvice(message -> message.getHeaders().get("FACTORY_KEY"),
dsf::setThreadKey, dsf::clearThreadKey);
}
@ServiceActivator(inputChannel = "in", adviceChain = "contextHolderRequestHandlerAdvice")
FtpOutboundGateway ftpOutboundGateway(DelegatingSessionFactory<?> sessionFactory) {
return new FtpOutboundGateway(sessionFactory, "ls", "payload");
}
向in
带有FACTORY_KEY
header 设置为one
或two
. 这ContextHolderRequestHandlerAdvice
将该标头中的值设置为DelegatingSessionFactory
通过其setThreadKey
.
然后当FtpOutboundGateway
执行ls
命令适当的委托SessionFactory
从DelegatingSessionFactory
根据其ThreadLocal
.
当结果从FtpOutboundGateway
一个ThreadLocal
值DelegatingSessionFactory
根据clearThreadKey()
调用ContextHolderRequestHandlerAdvice
.
有关详细信息,请参阅委托会话工厂。
定制建议课程
除了前面描述的提供的建议类之外,您还可以实现自己的建议类。虽然您可以提供任何org.aopalliance.aop.Advice
(通常org.aopalliance.intercept.MethodInterceptor
),我们通常建议您将子类o.s.i.handler.advice.AbstractRequestHandlerAdvice
. 这样做的好处是避免编写面向方面的低级编程代码,并提供了一个专门为此环境使用而定制的起点。
子类需要实现doInvoke()
方法,其定义如下:
/**
* Subclasses implement this method to apply behavior to the {@link MessageHandler} callback.execute()
* invokes the handler method and returns its result, or null).
* @param callback Subclasses invoke the execute() method on this interface to invoke the handler method.
* @param target The target handler.
* @param message The message that will be sent to the handler.
* @return the result after invoking the {@link MessageHandler}.
* @throws Exception
*/
protected abstract Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) throws Exception;
回调参数可以方便地避免直接处理 AOP 的子类。调用callback.execute()
方法调用消息处理程序。
这target
参数是为那些需要维护特定处理程序的状态的子类提供的,也许是通过在Map
由目标键控。此功能允许将相同的建议应用于多个处理程序。 这RequestHandlerCircuitBreakerAdvice
使用建议来保持每个处理程序的断路器状态。
这message
参数是发送到处理程序的消息。虽然通知无法在调用处理程序之前修改消息,但它可以修改有效负载(如果它具有可变属性)。通常,通知会使用消息进行日志记录或在调用处理程序之前或之后的某个位置发送消息的副本。
返回值通常是由callback.execute()
. 但是,通知确实能够修改返回值。请注意,只有AbstractReplyProducingMessageHandler
实例返回值。以下示例显示了一个自定义通知类,该类扩展了AbstractRequestHandlerAdvice
:
public class MyAdvice extends AbstractRequestHandlerAdvice {
@Override
protected Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) throws Exception {
// add code before the invocation
Object result = callback.execute();
// add code after the invocation
return result;
}
}
除了 有关更多信息,请参阅 ReflectiveMethodInvocation Javadoc。 |
处理消息通知
如本节简介中所述,请求处理程序通知链中的通知对象仅应用于当前端点,而不是下游流(如果有)。
为MessageHandler
生成回复的对象(例如扩展AbstractReplyProducingMessageHandler
),建议应用于内部方法:handleRequestMessage()
(从MessageHandler.handleMessage()
).
对于其他消息处理程序,建议应用于MessageHandler.handleMessage()
.
在某些情况下,即使消息处理程序是AbstractReplyProducingMessageHandler
,则该建议必须应用于handleMessage
方法。
例如,幂等接收器可能会返回null
,如果处理程序的replyRequired
属性设置为true
. 另一个例子是BoundRabbitChannelAdvice
— 请参阅严格消息排序。
从 4.3.1 版本开始,新的HandleMessageAdvice
接口及其基本实现 (AbstractHandleMessageAdvice
)已被引入。Advice
实现HandleMessageAdvice
始终应用于handleMessage()
方法,而不管处理程序类型如何。
重要的是要了解这一点HandleMessageAdvice
实现(例如幂等接收器)在应用于返回响应的处理程序时,与adviceChain
并正确应用于MessageHandler.handleMessage()
方法。
由于这种分离,建议链命令不被遵守。 |
请考虑以下配置:
<some-reply-producing-endpoint ... >
<int:request-handler-advice-chain>
<tx:advice ... />
<ref bean="myHandleMessageAdvice" />
</int:request-handler-advice-chain>
</some-reply-producing-endpoint>
在前面的示例中,<tx:advice>
应用于AbstractReplyProducingMessageHandler.handleRequestMessage()
.
然而myHandleMessageAdvice
申请MessageHandler.handleMessage()
.
因此,在<tx:advice>
.
要保留顺序,您应该遵循标准的 Spring AOP 配置方法并使用端点id
与.handler
后缀获取目标MessageHandler
豆。
请注意,在这种情况下,整个下游流都在事务范围内。
在MessageHandler
不返回响应的,则保留通知链顺序。
从 5.3 版开始,HandleMessageAdviceAdapter
提供以应用任何MethodInterceptor
对于MessageHandler.handleMessage()
方法,因此,整个子流。
例如,一个RetryOperationsInterceptor
可以应用于从某个端点开始的整个子流;默认情况下,这是不可能的,因为使用者端点仅将通知应用于AbstractReplyProducingMessageHandler.RequestHandler.handleRequestMessage()
.
交易支持
从 5.0 版开始,新的TransactionHandleMessageAdvice
引入了使整个下游流事务化,这要归功于HandleMessageAdvice
实现。
当一个普通的TransactionInterceptor
用于<request-handler-advice-chain>
元素(例如,通过配置<tx:advice>
),已启动的事务仅适用于内部AbstractReplyProducingMessageHandler.handleRequestMessage()
并且不会传播到下游流。
要简化 XML 配置,以及<request-handler-advice-chain>
一个<transactional>
元素已添加到所有<outbound-gateway>
和<service-activator>
和相关组件。
以下示例显示<transactional>
使用中:
<int-jdbc:outbound-gateway query="select * from things where id=:headers[id]">
<int-jdbc:transactional/>
</int-jdbc:outbound-gateway>
<bean id="transactionManager" class="org.mockito.Mockito" factory-method="mock">
<constructor-arg value="org.springframework.transaction.TransactionManager"/>
</bean>
Java 配置可以通过使用TransactionInterceptorBuilder
,并且结果 Bean 名称可以在消息传递注释中使用 adviceChain
属性,如以下示例所示:
@Bean
public ConcurrentMetadataStore store() {
return new SimpleMetadataStore(hazelcastInstance()
.getMap("idempotentReceiverMetadataStore"));
}
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(
new MetadataStoreSelector(
message -> message.getPayload().toString(),
message -> message.getPayload().toString().toUpperCase(), store()));
}
@Bean
public TransactionInterceptor transactionInterceptor() {
return new TransactionInterceptorBuilder(true)
.transactionManager(this.transactionManager)
.isolation(Isolation.READ_COMMITTED)
.propagation(Propagation.REQUIRES_NEW)
.build();
}
@Bean
@org.springframework.integration.annotation.Transformer(inputChannel = "input",
outputChannel = "output",
adviceChain = { "idempotentReceiverInterceptor",
"transactionInterceptor" })
public Transformer transformer() {
return message -> message;
}
请注意true
参数TransactionInterceptorBuilder
构造 函数。
它会导致创建TransactionHandleMessageAdvice
,不是常规的TransactionInterceptor
.
Java DSL 支持Advice
通过.transactional()
选项,如以下示例所示:
@Bean
public IntegrationFlow updatingGatewayFlow() {
return f -> f
.handle(Jpa.updatingGateway(this.entityManagerFactory),
e -> e.transactional(true))
.channel(c -> c.queue("persistResults"));
}
Advising Filters
提供建议时还有一个额外的考虑因素Filter
建议。
默认情况下,任何丢弃作(当过滤器返回false
)在建议链的范围内执行。
这可能包括丢弃通道下游的所有流量。
因此,例如,如果丢弃通道下游的元素抛出异常并且存在重试建议,则重试该过程。
此外,如果throwExceptionOnRejection
设置为true
(在通知的范围内引发异常)。
设置discard-within-advice
自false
修改此行为,并在调用通知链后发生丢弃(或异常)。
使用注释为端点提供建议
使用注释 (@Filter
,@ServiceActivator
,@Splitter
和@Transformer
),您可以在adviceChain
属性。
此外,@Filter
注释还具有discardWithinAdvice
属性,可用于配置丢弃行为,如Advising Filters中所述。
以下示例导致在通知后执行丢弃:
@MessageEndpoint
public class MyAdvisedFilter {
@Filter(inputChannel="input", outputChannel="output",
adviceChain="adviceChain", discardWithinAdvice="false")
public boolean filter(String s) {
return s.contains("good");
}
}
在建议链中订购建议
建议类是“围绕”建议,以嵌套方式应用。 第一个建议是最外层的,而最后一个建议是最里面的(即最接近被建议的处理者)。 重要的是要按正确的顺序放置建议类,以实现所需的功能。
例如,假设您要添加重试通知和事务通知。您可能希望首先放置重试通知,然后放置事务通知。因此,每次重试都在新事务中执行。另一方面,如果想要所有尝试和任何恢复作(在重试中)RecoveryCallback
) 在事务中限定范围,则可以将事务通知放在首位。
建议的处理程序属性
有时,从通知中访问处理程序属性很有用。例如,大多数处理程序实现NamedComponent
以允许您访问组件名称。
可以通过target
参数(当子类化AbstractRequestHandlerAdvice
) 或invocation.getThis()
(实施时org.aopalliance.intercept.MethodInterceptor
).
当建议整个处理程序时(例如,当处理程序不产生回复或通知实现HandleMessageAdvice
),您可以将目标对象转换为接口,例如NamedComponent
,如以下示例所示:
String componentName = ((NamedComponent) target).getComponentName();
当您实现MethodInterceptor
直接,您可以按如下方式强制转换目标对象:
String componentName = ((NamedComponent) invocation.getThis()).getComponentName();
当只有handleRequestMessage()
方法建议(在生成回复的处理程序中),您需要访问完整的处理程序,这是一个AbstractReplyProducingMessageHandler
. 以下示例显示了如何执行此作:
AbstractReplyProducingMessageHandler handler =
((AbstractReplyProducingMessageHandler.RequestHandler) target).getAdvisedHandler();
String componentName = handler.getComponentName();
幂等接收方企业集成模式
从 4.1 版开始,Spring Integration 提供了幂等接收器企业集成模式的实现。它是一种功能模式,整个幂等逻辑应该在应用程序中实现。但是,为了简化决策,命令IdempotentReceiverInterceptor
组件。这是一个 AOPAdvice
应用于MessageHandler.handleMessage()
方法,可以filter
请求消息或将其标记为duplicate
,根据其配置。
以前,您可以使用自定义MessageSelector
在<filter/>
(请参阅 Filter)。但是,由于此模式实际上定义了端点的行为,而不是端点本身,因此幂等接收器实现不提供端点组件。相反,它应用于应用程序中声明的端点。
的逻辑IdempotentReceiverInterceptor
基于提供的MessageSelector
并且,如果该选择器未接受消息,则使用duplicateMessage
header 设置为true
.
目标MessageHandler
(或下游流)可以查阅此标头以实现正确的幂等逻辑。
如果IdempotentReceiverInterceptor
配置了discardChannel
或throwExceptionOnRejection = true
,则不会将重复消息发送到目标MessageHandler.handleMessage()
.
相反,它被丢弃了。
如果要丢弃(不执行任何作)重复的消息,则discardChannel
应配置NullChannel
,例如默认的nullChannel
豆。
为了维护消息之间的状态并提供比较消息幂等性的能力,我们提供了MetadataStoreSelector
.
它接受一个MessageProcessor
实现(它基于Message
) 和可选的ConcurrentMetadataStore
(元数据存储)。
请参阅MetadataStoreSelector
Javadoc了解更多信息。
您还可以自定义value
为ConcurrentMetadataStore
通过使用附加的MessageProcessor
.
默认情况下,MetadataStoreSelector
使用timestamp
消息头。
通常,如果键没有现有值,则选择器会选择一条消息进行接受。
在某些情况下,比较键的当前值和新值以确定是否应接受消息非常有用。
从 5.3 版开始,compareValues
属性,它引用了BiPredicate<String, String>
;第一个参数是旧值;返回true
接受消息并将旧值替换为MetadataStore
.
这对于减少键数很有用;例如,在处理文件中的行时,可以将文件名存储在键中,将当前行号存储在值中。
然后,在重新启动后,您可以跳过已处理的行。
有关示例,请参阅幂等下游处理拆分文件。
为方便起见,该MetadataStoreSelector
选项可直接在<idempotent-receiver>
元件。
以下列表显示了所有可能的属性:
<idempotent-receiver
id="" (1)
endpoint="" (2)
selector="" (3)
discard-channel="" (4)
metadata-store="" (5)
key-strategy="" (6)
key-expression="" (7)
value-strategy="" (8)
value-expression="" (9)
compare-values="" (10)
throw-exception-on-rejection="" /> (11)
1 | 的 IDIdempotentReceiverInterceptor 豆。
自选。 |
2 | 应用此拦截器的使用者端点名称或模式。
用逗号 (, ),例如endpoint="aaa, bbb*, ccc, *ddd, eee*fff" .
然后,使用与这些模式匹配的端点 Bean 名称来检索目标端点的MessageHandler bean(使用其.handler 后缀),和IdempotentReceiverInterceptor 应用于这些 bean。
必填。 |
3 | 一个MessageSelector bean 引用。
相互排斥metadata-store 和key-strategy (key-expression) .
什么时候selector 未提供,则其中之一key-strategy 或key-strategy-expression 是必需的。 |
4 | 标识在IdempotentReceiverInterceptor 不接受。
如果省略,则重复的消息将转发到处理程序,并带有duplicateMessage 页眉。
自选。 |
5 | 一个ConcurrentMetadataStore 参考。
由基础层使用MetadataStoreSelector .
相互排斥selector .
自选。
默认值MetadataStoreSelector 使用内部SimpleMetadataStore 不会在应用程序执行中保持状态。 |
6 | 一个MessageProcessor 参考。
由基础层使用MetadataStoreSelector .
评估idempotentKey 从请求消息中。
相互排斥selector 和key-expression .
当selector 未提供,则其中之一key-strategy 或key-strategy-expression 是必需的。 |
7 | 用于填充ExpressionEvaluatingMessageProcessor .
由基础层使用MetadataStoreSelector .
评估idempotentKey 通过使用请求消息作为评估上下文根对象。
相互排斥selector 和key-strategy .
当selector 未提供,则其中之一key-strategy 或key-strategy-expression 是必需的。 |
8 | 一个MessageProcessor 参考。
由基础层使用MetadataStoreSelector .
评估value 对于idempotentKey 从请求消息中。
相互排斥selector 和value-expression .
默认情况下,“MetadataStoreSelector”使用“timestamp”消息标头作为元数据“值”。 |
9 | 用于填充ExpressionEvaluatingMessageProcessor .
由基础层使用MetadataStoreSelector .
评估value 对于idempotentKey 通过使用请求消息作为评估上下文根对象。
相互排斥selector 和value-strategy .
默认情况下,“MetadataStoreSelector”使用“timestamp”消息标头作为元数据“值”。 |
10 | 对BiPredicate<String, String> bean,它允许您通过比较键的新旧值来选择消息;null 默认情况下。 |
11 | 如果IdempotentReceiverInterceptor 拒绝邮件。
默认为false .
无论是否discard-channel 被提供。 |
对于 Java 配置,Spring Integration 提供了方法级@IdempotentReceiver
注解。
它用于标记method
具有消息传递注释 (@ServiceActivator
,@Router, and others) to specify which `IdempotentReceiverInterceptor
对象将应用于此终结点。
以下示例演示如何使用@IdempotentReceiver
注解:
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(new MetadataStoreSelector(m ->
m.getHeaders().get(INVOICE_NBR_HEADER)));
}
@Bean
@ServiceActivator(inputChannel = "input", outputChannel = "output")
@IdempotentReceiver("idempotentReceiverInterceptor")
public MessageHandler myService() {
....
}
使用 Java DSL 时,可以将拦截器添加到端点的通知链中,如以下示例所示:
@Bean
public IntegrationFlow flow() {
...
.handle("someBean", "someMethod",
e -> e.advice(idempotentReceiverInterceptor()))
...
}
这IdempotentReceiverInterceptor 专为MessageHandler.handleMessage(Message<?>) 方法。
从 4.3.1 版开始,它实现了HandleMessageAdvice ,替换为AbstractHandleMessageAdvice 作为基类,以便更好地解离。
有关详细信息,请参阅处理消息通知。 |
日志记录通道适配器
这<logging-channel-adapter>
通常与窃听器结合使用,如盗线器中所述。
但是,它也可以用作任何流的最终使用者。
例如,考虑一个以<service-activator>
返回一个结果,但您希望丢弃该结果。
为此,您可以将结果发送到NullChannel
.
或者,您可以将其路由到INFO
水平<logging-channel-adapter>
.
这样,您可以在登录时看到丢弃的消息INFO
级别,但在(例如)记录时看不到它WARN
水平。
使用NullChannel
,则在DEBUG
水平。
以下列表显示了logging-channel-adapter
元素:
<int:logging-channel-adapter
channel="" (1)
level="INFO" (2)
expression="" (3)
log-full-message="false" (4)
logger-name="" /> (5)
1 | 将日志记录适配器连接到上游组件的通道。 |
2 | 将记录发送到此适配器的消息的日志记录级别。
违约:INFO . |
3 | 一个 SpEL 表达式,表示准确记录消息的哪些部分。
违约:payload — 仅记录有效负载。
如果log-full-message ,则无法指定此属性。 |
4 | 什么时候true ,则记录整个邮件(包括标头)。
违约:false — 仅记录有效负载。
如果出现以下情况,则无法指定此属性expression 被指定。 |
5 | 指定name 记录器(称为category 在log4j ).
用于标识此适配器创建的日志消息。
这样就可以为各个适配器设置日志名称(在日志记录子系统中)。
默认情况下,所有适配器都以以下名称进行记录:org.springframework.integration.handler.LoggingHandler . |
使用 Java 配置
以下 Spring Boot 应用程序显示了配置LoggingHandler
通过使用 Java 配置:
@SpringBootApplication
public class LoggingJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(LoggingJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
gateway.sendToLogger("foo");
}
@Bean
@ServiceActivator(inputChannel = "logChannel")
public LoggingHandler logging() {
LoggingHandler adapter = new LoggingHandler(LoggingHandler.Level.DEBUG);
adapter.setLoggerName("TEST_LOGGER");
adapter.setLogExpressionString("headers.id + ': ' + payload");
return adapter;
}
@MessagingGateway(defaultRequestChannel = "logChannel")
public interface MyGateway {
void sendToLogger(String data);
}
}
使用 Java DSL 进行配置
以下 Spring Boot 应用程序显示了使用 Java DSL 配置日志记录通道适配器的示例:
@SpringBootApplication
public class LoggingJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(LoggingJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
gateway.sendToLogger("foo");
}
@Bean
public IntegrationFlow loggingFlow() {
return IntegrationFlow.from(MyGateway.class)
.log(LoggingHandler.Level.DEBUG, "TEST_LOGGER",
m -> m.getHeaders().getId() + ": " + m.getPayload());
}
@MessagingGateway
public interface MyGateway {
void sendToLogger(String data);
}
}
java.util.function
接口支持
从版本 5.1 开始,Spring Integration 为java.util.function
包。
所有消息传递端点(Service Activator、Transformer、Filter 等)现在都可以引用Function
(或Consumer
)豆子。
消息传递注释可以直接应用于这些 bean,类似于常规MessageHandler
定义。
例如,如果你有这个Function
豆子定义:
@Configuration
public class FunctionConfiguration {
@Bean
public Function<String, String> functionAsService() {
return String::toUpperCase;
}
}
您可以将其用作 XML 配置文件中的简单引用:
<service-activator input-channel="processorViaFunctionChannel" ref="functionAsService"/>
当我们使用消息传递注释配置流时,代码很简单:
@Bean
@Transformer(inputChannel = "functionServiceChannel")
public Function<String, String> functionAsService() {
return String::toUpperCase;
}
当函数返回数组时,Collection
(本质上,任何Iterable
),Stream
或反应堆Flux
,@Splitter
可以在这样的 bean 上用于对结果内容执行迭代。
这java.util.function.Consumer
接口可用于<int:outbound-channel-adapter>
或者,与@ServiceActivator
注释,以执行流的最后一步:
@Bean
@ServiceActivator(inputChannel = "messageConsumerServiceChannel")
public Consumer<Message<?>> messageConsumerAsService() {
// Has to be an anonymous class for proper type inference
return new Consumer<Message<?>>() {
@Override
public void accept(Message<?> e) {
collector().add(e);
}
};
}
另外,请注意上面代码片段中的注释:如果您想处理整个消息Function
/Consumer
不能使用 lambda 定义。
由于 Java 类型擦除,我们无法确定apply()/accept()
方法调用。
这java.util.function.Supplier
接口可以简单地与@InboundChannelAdapter
注释,或作为ref
在<int:inbound-channel-adapter>
:
@Bean
@InboundChannelAdapter(value = "inputChannel", poller = @Poller(fixedDelay = "1000"))
public Supplier<String> pojoSupplier() {
return () -> "foo";
}
对于 Java DSL,我们只需要在端点定义中使用对函数 bean 的引用。
同时,实现Supplier
接口可以作为常规使用MessageSource
定义:
@Bean
public Function<String, String> toUpperCaseFunction() {
return String::toUpperCase;
}
@Bean
public Supplier<String> stringSupplier() {
return () -> "foo";
}
@Bean
public IntegrationFlow supplierFlow() {
return IntegrationFlow.from(stringSupplier())
.transform(toUpperCaseFunction())
.channel("suppliedChannel")
.get();
}
当与 Spring Cloud Function 框架一起使用时,此函数支持非常有用,其中我们有一个函数目录,可以从集成流定义中引用其成员函数。
Kotlin 支持
该框架还得到了改进,以支持 Kotlin lambda 的函数,因此现在您可以结合使用 Kotlin 语言和 Spring Integration 流定义:
@Bean
@Transformer(inputChannel = "functionServiceChannel")
fun kotlinFunction(): (String) -> String {
return { it.toUpperCase() }
}
@Bean
@ServiceActivator(inputChannel = "messageConsumerServiceChannel")
fun kotlinConsumer(): (Message<Any>) -> Unit {
return { print(it) }
}
@Bean
@InboundChannelAdapter(value = "counterChannel",
poller = Poller(fixedRate = "10", maxMessagesPerPoll = "1"))
fun kotlinSupplier(): () -> String {
return { "baz" }
}
Kotlin 协程
从 6.0 版本开始,Spring Integration 提供了对 Kotlin 协程的支持。
现在suspend
函数和kotlinx.coroutines.Deferred
& kotlinx.coroutines.flow.Flow
返回类型可用于服务方法:
@ServiceActivator(inputChannel = "suspendServiceChannel", outputChannel = "resultChannel")
suspend fun suspendServiceFunction(payload: String) = payload.uppercase()
@ServiceActivator(inputChannel = "flowServiceChannel", outputChannel = "resultChannel", async = "true")
fun flowServiceFunction(payload: String) =
flow {
for (i in 1..3) {
emit("$payload #$i")
}
}
该框架将它们视为响应式流交互并使用ReactiveAdapterRegistry
转换为相应的Mono
和Flux
反应器类型。
这样的函数回复将在回复通道中处理,如果它是ReactiveStreamsSubscribableChannel
,或由于CompletableFuture
在相应的回调中。
带有Flow 结果不是async 默认情况下,在@ServiceActivator 所以Flow 实例作为回复消息有效负载生成。
目标应用程序负责将此对象作为协程进行处理或将其转换为Flux 分别。 |
这@MessagingGateway
接口方法也可以用suspend
修饰符。
该框架利用Mono
内部使用下游流执行请求-回复。
这样的Mono
结果由MonoKt.awaitSingleOrNull()
API 内部实现kotlin.coroutines.Continuation
被调用的参数suspend
网关功能:
@MessagingGateway(defaultRequestChannel = "suspendRequestChannel")
interface SuspendFunGateway {
suspend fun suspendGateway(payload: String): String
}
根据 Kotlin 语言要求,此方法必须作为协程调用:
@Autowired
private lateinit var suspendFunGateway: SuspendFunGateway
fun someServiceMethod() {
runBlocking {
val reply = suspendFunGateway.suspendGateway("test suspend gateway")
}
}