消息传递端点
消息收发终端节点
消息端点
本章的第一部分涵盖了一些背景理论,并揭示了相当多的关于驱动 Spring Integration 的各种消息传递组件的底层 API。 如果您想真正了解幕后发生的事情,这些信息可能会有所帮助。 但是,如果您想启动并运行各种元素的简化基于命名空间的配置,请随时跳到 Endpoint Namespace Support (终端节点命名空间支持)。
如概述中所述,消息终端节点负责将各种消息收发组件连接到通道。 在接下来的几章中,我们将介绍许多使用消息的不同组件。 其中一些还能够发送回复消息。 发送消息非常简单。 如前面的 Message Channels 中所示,您可以向消息通道发送消息。 但是,接收稍微复杂一些。 主要原因是有两种类型的使用者:轮询使用者和事件驱动使用者。
在这两者中,事件驱动型消费者要简单得多。
无需管理和调度单独的 Poller 线程,它们本质上是具有回调方法的侦听器。
当连接到 Spring Integration 的可订阅消息通道之一时,这个简单的选项效果很好。
但是,当连接到缓冲的、可轮询的消息通道时,某些组件必须调度和管理轮询线程。
Spring 集成提供了两种不同的端点实现来容纳这两种类型的消费者。
因此,消费者自己只需要实现回调接口即可。
当需要轮询时,终端节点充当使用者实例的容器。
其好处类似于使用容器托管消息驱动的 bean,但是,由于这些使用者是在 中运行的 Spring 管理的对象,因此它更类似于 Spring 自己的容器。ApplicationContext
MessageListener
消息处理程序
Spring Integration 的接口由框架中的许多组件实现。
换句话说,这不是公共 API 的一部分,您通常不会直接实现。
尽管如此,消息使用者使用它来实际处理使用的消息,因此了解此策略接口确实有助于理解使用者的整体角色。
接口定义如下:MessageHandler
MessageHandler
public interface MessageHandler {
void handleMessage(Message<?> message);
}
尽管它很简单,但这个接口为以下章节中介绍的大多数组件(路由器、转换器、拆分器、聚合器、服务激活器等)提供了基础。 这些组件各自对它们处理的消息执行非常不同的功能,但实际接收消息的要求是相同的,轮询和事件驱动行为之间的选择也是相同的。 Spring 集成提供了两个端点实现,它们托管这些基于回调的处理程序,并让它们连接到消息通道。
事件驱动型消费者
因为它是两者中更简单的,所以我们首先介绍事件驱动的使用者终端节点。
你可能还记得,该接口提供了一个方法,并且该方法接受一个参数(如SubscribableChannel
所示)。
下面的清单显示了该方法的定义:SubscribableChannel
subscribe()
MessageHandler
subscribe
subscribableChannel.subscribe(messageHandler);
由于订阅通道的处理程序不必主动轮询该通道,因此这是一个事件驱动的使用者,并且 Spring Integration 提供的实现接受 a 和 a ,如下例所示:SubscribableChannel
MessageHandler
SubscribableChannel channel = context.getBean("subscribableChannel", SubscribableChannel.class);
EventDrivenConsumer consumer = new EventDrivenConsumer(channel, exampleHandler);
轮询消费者
Spring 集成还提供了一个 ,它可以以相同的方式实例化,只是通道必须实现 ,如下例所示:PollingConsumer
PollableChannel
PollableChannel channel = context.getBean("pollableChannel", PollableChannel.class);
PollingConsumer consumer = new PollingConsumer(channel, exampleHandler);
轮询使用者还有许多其他配置选项。 例如,trigger 是 required 属性。 以下示例显示如何设置触发器:
PollingConsumer consumer = new PollingConsumer(channel, handler);
consumer.setTrigger(new PeriodicTrigger(Duration.ofSeconds(30)));
通常使用简单的间隔 () 定义,但也支持属性和布尔属性(默认值为 — 即没有固定延迟)。
以下示例设置这两个属性:PeriodicTrigger
Duration
initialDelay
fixedRate
false
PeriodicTrigger trigger = new PeriodicTrigger(Duration.ofSeconds(1));
trigger.setInitialDelay(Duration.ofSeconds(5));
trigger.setFixedRate(true);
前面示例中的三个设置的结果是一个等待 5 秒,然后每秒触发一次的触发器。
这需要有效的 cron 表达式。
有关详细信息,请参阅 Javadoc。
以下示例将新的 :CronTrigger
CronTrigger
CronTrigger trigger = new CronTrigger("*/10 * * * * MON-FRI");
上一个示例中定义的触发器的结果是星期一到星期五每 10 秒触发一次的触发器。
除了触发器之外,您还可以指定另外两个与轮询相关的配置属性:和 。
以下示例说明如何设置这两个属性:maxMessagesPerPoll
receiveTimeout
PollingConsumer consumer = new PollingConsumer(channel, handler);
consumer.setMaxMessagesPerPoll(10);
consumer.setReceiveTimeout(5000);
该属性指定在给定轮询操作中要接收的最大消息数。
这意味着 poller 继续调用而不等待,直到返回或达到最大值。
例如,如果 Poller 具有 10 秒间隔触发器且设置为 ,并且它正在轮询队列中有 100 条消息的通道,则可以在 40 秒内检索所有 100 条消息。
它抓取 25 个,等待 10 秒,抓取下一个 25 个,依此类推。
如果 配置为负值,则在单个轮询周期内调用 ,直到返回 。
从版本 5.5 开始,一个值具有特殊的含义 - 完全跳过调用,这可能被视为暂停此轮询端点,直到稍后将 更改为 n 非零值,例如通过 Control Bus。maxMessagesPerPoll
receive()
null
maxMessagesPerPoll
25
maxMessagesPerPoll
MessageSource.receive()
null
0
MessageSource.receive()
maxMessagesPerPoll
该属性指定 Poller 在调用 receive 操作时如果没有可用的消息时应等待的时间。
例如,考虑两个表面上看起来相似但实际上完全不同的选项:第一个选项的间隔触发为 5 秒,接收超时为 50 毫秒,而第二个选项的间隔触发为 50 毫秒,接收超时为 5 秒。
第一个 API 可能会比它到达通道晚 4950 毫秒收到一条消息(如果该消息在其 poll 调用之一返回后立即到达)。
另一方面,第二个配置永远不会错过超过 50 毫秒的消息。
区别在于第二个选项需要线程等待。
但是,因此,它可以更快地响应到达的消息。
这种技术称为 “长轮询”,可用于在轮询源上模拟事件驱动的行为。receiveTimeout
轮询使用者还可以委托给 Spring ,如下例所示:TaskExecutor
PollingConsumer consumer = new PollingConsumer(channel, handler);
TaskExecutor taskExecutor = context.getBean("exampleExecutor", TaskExecutor.class);
consumer.setTaskExecutor(taskExecutor);
此外,a 具有一个名为 的属性。
此属性允许您指定 AOP 建议,以处理其他横切关注点,包括事务。
这些建议是围绕方法应用的。
有关更深入的信息,请参阅 Endpoint Namespace Support 下的有关 AOP 建议链和事务支持的部分。PollingConsumer
adviceChain
List
doPoll()
前面的示例显示了依赖项查找。
但是,请记住,这些使用者通常配置为 Spring bean 定义。
事实上, Spring 集成还提供了一个调用,它根据通道的类型创建适当的消费者类型。
此外,Spring 集成具有完整的 XML 名称空间支持,以进一步隐藏这些细节。
本指南介绍了基于命名空间的配置,因为介绍了每种组件类型。FactoryBean
ConsumerEndpointFactoryBean
许多实现可以生成回复消息。
如前所述,与接收消息相比,发送消息是微不足道的。
但是,何时发送回复消息以及发送多少回复消息取决于处理程序类型。
例如,聚合器等待大量消息到达,并且通常配置为拆分器的下游使用者,该拆分器可以为其处理的每条消息生成多个回复。
使用 namespace 配置时,您不需要严格了解所有详细信息。
但是,仍然值得知道,这些组件中的几个共享一个公共基类 ,并且它提供了一个方法。MessageHandler AbstractReplyProducingMessageHandler setOutputChannel(..) |
终端节点命名空间支持
在本参考手册中,您可以找到终端节点元素的特定配置示例,例如 router、transformer、service-activator 等。
其中大多数都支持属性,许多都支持属性。
解析后,这些 endpoint 元素会生成 the 或 the 的实例,具体取决于引用的类型: 或 。
当通道是可轮询的时,轮询行为基于 endpoint 元素的 sub-element 及其属性。input-channel
output-channel
PollingConsumer
EventDrivenConsumer
input-channel
PollableChannel
SubscribableChannel
poller
以下清单列出了 a 的所有可用配置选项:poller
<int:poller cron="" (1)
default="false" (2)
error-channel="" (3)
fixed-delay="" (4)
fixed-rate="" (5)
id="" (6)
max-messages-per-poll="" (7)
receive-timeout="" (8)
ref="" (9)
task-executor="" (10)
time-unit="MILLISECONDS" (11)
trigger=""> (12)
<int:advice-chain /> (13)
<int:transactional /> (14)
</int:poller>
1 | 提供使用 Cron 表达式配置 Poller 的功能。
底层实现使用 .
如果设置了此属性,则不必指定以下任何属性: 、 、 和 。org.springframework.scheduling.support.CronTrigger fixed-delay trigger fixed-rate ref |
2 | 通过将此属性设置为 ,你可以定义一个全局默认 Poller。
如果在应用程序上下文中定义了多个默认 Poller,则会引发异常。
任何连接到 () 的端点或任何没有显式配置的 Poller 的端点都使用全局默认 Poller。
它默认为 .
自选。true PollableChannel PollingConsumer SourcePollingChannelAdapter false |
3 | 标识如果此 Poller 的调用失败,则向其发送错误消息的通道。
要完全禁止显示异常,您可以提供对 .
自选。nullChannel |
4 | fixed delay trigger 在幕后使用 a。
如果不使用该属性,则指定的值以毫秒为单位表示。
如果设置了此属性,则不必指定以下任何属性: 、 、 和 。PeriodicTrigger time-unit fixed-rate trigger cron ref |
5 | 固定速率触发器在幕后使用 a。
如果不使用该属性,则指定的值以毫秒为单位表示。
如果设置了此属性,则不必指定以下任何属性: 、 、 和 。PeriodicTrigger time-unit fixed-delay trigger cron ref |
6 | 引用 poller 的基础 bean 定义的 ID,其类型为 。
该属性对于顶级 poller 元素是必需的,除非它是默认的 poller () 。org.springframework.integration.scheduling.PollerMetadata id default="true" |
7 | 有关更多信息,请参阅Configuring An Inbound Channel Adapter。
如果未指定,则默认值取决于上下文。
如果使用 ,则此属性默认为 。
但是,如果使用 ,则该属性默认为 。
自选。PollingConsumer -1 SourcePollingChannelAdapter max-messages-per-poll 1 |
8 | value 在底层 class 上设置。
如果未指定,则默认为 1000(毫秒)。
自选。PollerMetadata |
9 | Bean 引用另一个顶级 Poller 的 Pod 引用。
该属性不得出现在 top-level 元素上。
但是,如果设置了此属性,则不必指定以下任何属性:、、 、 和 。ref poller fixed-rate trigger cron fixed-delay |
10 | 提供引用自定义任务执行程序的功能。 有关更多信息,请参阅 TaskExecutor 支持。 自选。 |
11 | 此属性指定基础 的 enum 值。
因此,此属性只能与 or 属性结合使用。
如果与 或 reference 属性结合使用,则会导致失败。
a 支持的最小粒度为毫秒。
因此,唯一可用的选项是毫秒和秒。
如果未提供此值,则 any 或 value 将解释为毫秒。
基本上,此枚举为基于秒的 interval 触发器值提供了便利。
对于每小时、每天和每月设置,我们建议改用触发器。java.util.concurrent.TimeUnit org.springframework.scheduling.support.PeriodicTrigger fixed-delay fixed-rate cron trigger PeriodicTrigger fixed-delay fixed-rate cron |
12 | 对实现接口的任何 Spring 配置的 bean 的引用。
但是,如果设置了此属性,则不必指定以下任何属性:、、 、 和 。
自选。org.springframework.scheduling.Trigger fixed-delay fixed-rate cron ref |
13 | 允许指定额外的 AOP 建议来处理其他横切关注点。 有关更多信息,请参阅 Transaction Support 。 自选。 |
14 | Poller 可以成为事务性的。 有关更多信息,请参阅 AOP Advice 链。 自选。 |
例子
可以按如下方式配置具有 1 秒间隔的简单基于间隔的 Poller :
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller fixed-rate="1000"/>
</int:transformer>
作为使用 attribute 的替代方法,您还可以使用 attribute。fixed-rate
fixed-delay
对于基于 Cron 表达式的 Poller ,请改用该属性,如下例所示:cron
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller cron="*/10 * * * * MON-FRI"/>
</int:transformer>
如果 input 通道是 a ,则需要 poller 配置。
具体来说,如前所述,this 是类的 required 属性。
因此,如果省略轮询使用者终结点配置的子元素,则可能会引发异常。
如果你试图在连接到不可轮询通道的元素上配置 Poller,也可能引发异常。PollableChannel
trigger
PollingConsumer
poller
也可以创建顶级 Poller,在这种情况下,只需要一个属性,如下例所示:ref
<int:poller id="weekdayPoller" cron="*/10 * * * * MON-FRI"/>
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller ref="weekdayPoller"/>
</int:transformer>
该属性只允许在内部 Poller 定义上。
在顶级 Poller 上定义此属性会导致在应用程序上下文初始化期间引发配置异常。ref |
全局默认轮询器
为了进一步简化配置,你可以定义一个全局默认 Poller。
XML DSL 中的单个顶级 Poller 组件可以将属性设置为 。
对于 Java 配置,在这种情况下必须声明具有 name 的 bean。
在这种情况下,任何具有 a 的 input 通道的端点,如果在同一 个中定义,并且没有明确配置,则使用该默认值。
下面的示例展示了这样的 poller 和使用它的转换器:default
true
PollerMetadata
PollerMetadata.DEFAULT_POLLER
PollableChannel
ApplicationContext
poller
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setMaxMessagesPerPoll(5);
pollerMetadata.setTrigger(new PeriodicTrigger(3000));
return pollerMetadata;
}
// No 'poller' attribute because there is a default global poller
@Bean
public IntegrationFlow transformFlow(MyTransformer transformer) {
return IntegrationFlow.from(MessageChannels.queue("pollable"))
.transform(transformer) // No 'poller' attribute because there is a default global poller
.channel("output")
.get();
}
@Bean(PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setMaxMessagesPerPoll(5);
pollerMetadata.setTrigger(new PeriodicTrigger(3000));
return pollerMetadata;
}
@Bean
public QueueChannel pollable() {
return new QueueChannel();
}
// No 'poller' attribute because there is a default global poller
@Transformer(inputChannel = "pollable", outputChannel = "output")
public Object transform(Object payload) {
...
}
@Bean(PollerMetadata.DEFAULT_POLLER)
fun defaultPoller() =
PollerMetadata()
.also {
it.maxMessagesPerPoll = 5
it.trigger = PeriodicTrigger(3000)
}
@Bean
fun convertFlow() =
integrationFlow(MessageChannels.queue("pollable")) {
transform(transformer) // No 'poller' attribute because there is a default global poller
channel("output")
}
<int:poller id="defaultPoller" default="true" max-messages-per-poll="5" fixed-delay="3000"/>
<!-- No <poller/> sub-element is necessary, because there is a default -->
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output"/>
事务支持
Spring 集成还为 pollers 提供了事务支持,以便每个接收和转发操作都可以作为原子工作单元执行。
要为 Poller 配置事务,请添加 sub-element。
以下示例显示了可用属性:<transactional/>
<int:poller fixed-delay="1000">
<int:transactional transaction-manager="txManager"
propagation="REQUIRED"
isolation="REPEATABLE_READ"
timeout="10000"
read-only="false"/>
</int:poller>
有关更多信息,请参阅 轮询器事务支持。
AOP 建议链
由于 Spring 事务支持依赖于代理机制,其中(AOP 建议)处理由 Poller 发起的消息流的事务行为,因此有时必须提供额外的建议来处理与 Poller 关联的其他横切行为。
为此,它定义了一个元素,该元素允许您在实现该接口的类中添加更多建议。
以下示例说明如何定义 for a :TransactionInterceptor
poller
advice-chain
MethodInterceptor
advice-chain
poller
<int:service-activator id="advicedSa" input-channel="goodInputWithAdvice" ref="testBean"
method="good" output-channel="output">
<int:poller max-messages-per-poll="1" fixed-rate="10000">
<int:advice-chain>
<ref bean="adviceA" />
<beans:bean class="org.something.SampleAdvice" />
<ref bean="txAdvice" />
</int:advice-chain>
</int:poller>
</int:service-activator>
有关如何实现该接口的更多信息,请参见 Spring Framework Reference Guide 的 AOP 部分。
建议链也可以应用于没有任何事务配置的 Poller,从而增强 Poller 启动的消息流的行为。MethodInterceptor
使用通知链时,不能指定 child 元素。
相反,声明一个 bean 并将其添加到 .
有关完整的配置详细信息,请参阅 Poller Transaction Support 。<transactional/> <tx:advice/> <advice-chain/> |
TaskExecutor 支持
轮询线程可以由 Spring 抽象的任何实例执行。
这将为一个终端节点或一组终端节点启用并发。
从 Spring 3.0 开始,核心 Spring Framework 有一个命名空间,它的元素支持创建一个简单的线程池执行器。
该元素接受常见并发设置的属性,例如 pool-size 和 queue-capacity。
配置线程池执行程序可以对终端节点在负载下的性能产生重大影响。
这些设置适用于每个终端节点,因为终端节点的性能是要考虑的主要因素之一(另一个主要因素是终端节点订阅的通道上的预期卷)。
要为配置了 XML 命名空间支持的轮询终端节点启用并发,请提供对其元素的引用,然后提供以下示例中所示的一个或多个属性:TaskExecutor
task
<executor/>
task-executor
<poller/>
<int:poller task-executor="pool" fixed-rate="1000"/>
<task:executor id="pool"
pool-size="5-25"
queue-capacity="20"
keep-alive="120"/>
如果您不提供 task-executor,则在调用者的线程中调用使用者的处理程序。
请注意,调用方通常是默认调用方(请参阅配置 Task Scheduler)。
您还应该记住,该属性可以通过指定 bean 名称来提供对 Spring 接口的任何实现的引用。
为方便起见,提供了前面显示的元素。TaskScheduler
task-executor
TaskExecutor
executor
如前面在轮询使用者的 background 部分中提到的,您还可以以模拟事件驱动行为的方式配置轮询使用者。
在触发器中使用较长的接收超时和较短的间隔,您可以确保对到达的消息做出非常及时的反应,即使在轮询的消息源上也是如此。
请注意,这仅适用于具有超时的阻塞 wait 调用的源。
例如,文件 poller 不会阻塞。
每个调用都会立即返回,并且要么包含新文件,要么不包含新文件。
因此,即使 poller 包含 long ,该值也永远不会在这种情况下使用。
另一方面,当使用 Spring Integration 自己的基于队列的通道时,超时值确实有机会参与。
以下示例显示了轮询使用者如何几乎即时地接收消息:receive()
receive-timeout
<int:service-activator input-channel="someQueueChannel"
output-channel="output">
<int:poller receive-timeout="30000" fixed-rate="10"/>
</int:service-activator>
使用这种方法不会带来太多开销,因为在内部,它只不过是一个定时等待线程,它不需要像(例如)颠簸、无限 while 循环那样多的 CPU 资源使用。
在运行时更改轮询率
当使用 a 或 a 属性配置 Poller 时,默认实现使用实例。
它是核心 Spring 框架的一部分。
它仅接受 interval 作为构造函数参数。
因此,无法在运行时更改它。fixed-delay
fixed-rate
PeriodicTrigger
PeriodicTrigger
但是,您可以定义自己的接口实现。
您甚至可以将 用作起点。
然后,您可以为间隔 (period) 添加 setter,甚至可以在触发器本身中嵌入自己的限制逻辑。
该属性与每次调用 to 一起使用,以安排下一次轮询。
要在 Poller 中使用此自定义触发器,请在应用程序上下文中声明自定义触发器的 bean 定义,并使用引用自定义触发器 bean 实例的属性将依赖项注入到 poller 配置中。
现在,您可以获取对触发器 Bean 的引用并更改轮询之间的轮询间隔。org.springframework.scheduling.Trigger
PeriodicTrigger
period
nextExecutionTime
trigger
有关示例,请参阅 Spring Integration Samples 项目。
它包含一个名为 的示例,该示例使用自定义触发器,并演示了在运行时更改轮询间隔的能力。dynamic-poller
该示例提供了一个实现org.springframework.scheduling.Trigger
接口的自定义触发器。
该示例的触发器基于 Spring 的PeriodicTrigger
实现。
但是,自定义触发器的字段不是最终的,并且属性具有显式 getter 和 setter,允许您在运行时动态更改轮询周期。
但是,请务必注意,由于 Trigger 方法是 ,因此根据现有配置,对动态触发器的任何更改在下一次轮询之前都不会生效。
无法强制触发器在当前配置的下一次执行时间之前触发。nextExecutionTime() |
负载类型转换
在本参考手册中,您还可以看到接受 message 或 any arbitrary 作为输入参数的各种终端节点的特定配置和实现示例。
在 的情况下 ,这样的参数被映射到消息有效负载或有效负载或 Headers 的一部分(使用 Spring 表达式语言时)。
但是,端点方法的 input 参数类型有时与有效负载或其部分的类型不匹配。
在此方案中,我们需要执行类型转换。
Spring 集成提供了一种方便的方法,用于在其自己的名为 .
一旦使用 Spring 集成基础结构定义了第一个转换器,就会自动创建该 bean。
要注册转换器,您可以实现 、 、 或 。Object
Object
ConversionService
integrationConversionService
org.springframework.core.convert.converter.Converter
org.springframework.core.convert.converter.GenericConverter
org.springframework.core.convert.converter.ConverterFactory
实现是最简单的,可以从单个类型转换为另一种类型。
对于更复杂的操作,例如转换为类层次结构,您可以实现 a 和可能的 .
这些提供了对 和 类型描述符的完全访问权限,从而实现复杂的转换。
例如,如果您有一个名为 abstract class 的 abstract class,它是转换的目标(参数类型、通道数据类型等),则您有两个名为 和 的具体实现,并且您希望根据输入类型转换为其中一个或另一个,那么 this 将是一个不错的选择。
有关更多信息,请参阅以下接口的 Javadoc:Converter
GenericConverter
ConditionalConverter
from
to
Something
Thing1
Thing
GenericConverter
实现转换器后,可以使用方便的命名空间支持注册它,如下例所示:
<int:converter ref="sampleConverter"/>
<bean id="sampleConverter" class="foo.bar.TestConverter"/>
或者,你可以使用内部 Bean,如下例所示:
<int:converter>
<bean class="o.s.i.config.xml.ConverterParserTests$TestConverter3"/>
</int:converter>
从 Spring 集成 4.0 开始,你可以使用 Comments 来创建前面的配置,如下例所示:
@Component
@IntegrationConverter
public class TestConverter implements Converter<Boolean, Number> {
public Number convert(Boolean source) {
return source ? 1 : 0;
}
}
或者,您可以使用注释,如下例所示:@Configuration
@Configuration
@EnableIntegration
public class ContextConfiguration {
@Bean
@IntegrationConverter
public SerializingConverter serializingConverter() {
return new SerializingConverter();
}
}
在配置应用程序上下文时, Spring 框架允许您添加 bean (请参见配置 ConversionService 章节)。
如果需要,此服务用于在 bean 创建和配置期间执行适当的转换。 相反,the 用于运行时转换。
这些用途完全不同。
如果在运行时用于对数据类型通道、有效负载类型转换器等中的消息进行 Spring 集成表达式评估,则用于连接 bean 构造函数参数和属性时使用的转换器可能会产生意外的结果。 但是,如果您确实希望将 Spring 用作 Spring 集成 ,则可以在应用程序上下文中配置别名,如下例所示:
在这种情况下,提供的转换器可用于 Spring 集成运行时转换。 |
内容类型转换
从版本 5.0 开始,默认情况下,方法调用机制基于基础结构。
它的实现(例如 和 )可以使用抽象将 incoming 转换为目标方法参数类型。
转换可以基于消息标头。
为此, Spring 集成提供了 ,它委托给要调用的已注册转换器列表,直到其中一个转换器返回非空结果。
默认情况下,此转换器提供(按严格顺序):org.springframework.messaging.handler.invocation.InvocableHandlerMethod
HandlerMethodArgumentResolver
PayloadArgumentResolver
MessageMethodArgumentResolver
MessageConverter
payload
contentType
ConfigurableCompositeMessageConverter
-
MappingJackson2MessageConverter
(如果 Jackson 处理器存在于 Classpath 上)
请参阅 Javadoc(在前面的列表中链接),以了解有关其用途和适当的转换值的更多信息。
之所以使用 The ,是因为它可以与任何其他 implementations 一起提供,包括或不包括前面提到的 default converters。
它也可以在应用程序上下文中注册为适当的 bean,覆盖默认转换器,如下例所示:contentType
ConfigurableCompositeMessageConverter
MessageConverter
@Bean(name = IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME)
public ConfigurableCompositeMessageConverter compositeMessageConverter() {
List<MessageConverter> converters =
Arrays.asList(new MarshallingMessageConverter(jaxb2Marshaller()),
new JavaSerializationMessageConverter());
return new ConfigurableCompositeMessageConverter(converters);
}
这两个新转换器在 defaults 之前注册在 composite 中。
您也可以不使用 a 但通过注册一个名称为 (通过设置属性) 的 bean 来提供自己的 bean。ConfigurableCompositeMessageConverter
MessageConverter
integrationArgumentResolverMessageConverter
IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME
使用 SPEL 方法调用时,基于 -(包括 header)的转换不可用。
在这种情况下,只有上面 Payload Type Conversion 中提到的常规类到类转换可用。MessageConverter contentType |
异步轮询
如果你希望轮询是异步的,那么 Poller 可以选择指定一个指向任何 bean 的现有实例的属性(Spring 3.0 通过名称空间提供了一个方便的命名空间配置)。
但是,在使用 .task-executor
TaskExecutor
task
TaskExecutor
问题在于有两种配置,即 poller 和 .
他们必须彼此协调一致。
否则,您最终可能会造成人为的内存泄漏。TaskExecutor
请考虑以下配置:
<int:channel id="publishChannel">
<int:queue />
</int:channel>
<int:service-activator input-channel="publishChannel" ref="myService">
<int:poller receive-timeout="5000" task-executor="taskExecutor" fixed-rate="50" />
</int:service-activator>
<task:executor id="taskExecutor" pool-size="20" />
前面的配置演示了 Out-of-tune 配置。
默认情况下,任务执行程序具有无界任务队列。 即使所有线程都被阻塞,Poller 也会继续调度新任务,等待新消息到达或超时过期。 假设有 20 个线程执行任务,超时时间为 5 秒,因此它们以每秒 4 个的速率执行。 但是,新任务以每秒 20 个的速率调度,因此任务执行程序中的内部队列以每秒 16 个的速度增长(当进程空闲时),因此我们存在内存泄漏。
处理此问题的方法之一是设置 task executor 的属性。
即使 0 也是一个合理的值。
您还可以通过设置 Task Executor 的属性(例如,to )来指定如何处理无法排队的消息,从而对其进行管理。
换句话说,在配置 时,您必须了解某些详细信息。
有关该主题的更多详细信息,请参阅 Spring 参考手册中的“任务执行和调度”。queue-capacity
rejection-policy
DISCARD
TaskExecutor
端点内部 Bean
许多端点是复合 bean。
这包括所有使用者和所有轮询的入站通道适配器。
使用者(轮询或事件驱动型)委托给 .
轮询适配器通过委托 来获取消息。
通常,获取对委托 Bean 的引用很有用,这可能是为了在运行时更改配置或用于测试。
这些豆子可以从众所周知的名字中获得。 实例使用类似于 bean ID 的 bean ID 在应用程序上下文中注册(其中 'consumer' 是端点属性的值)。 实例使用类似于 的 bean ID 注册,其中 'somePolledAdapter' 是适配器的 ID。MessageHandler
MessageSource
ApplicationContext
MessageHandler
someConsumer.handler
id
MessageSource
somePolledAdapter.source
上述内容仅适用于框架组件本身。 您可以改用内部 Bean 定义,如下例所示:
<int:service-activator id="exampleServiceActivator" input-channel="inChannel"
output-channel = "outChannel" method="foo">
<beans:bean class="org.foo.ExampleServiceActivator"/>
</int:service-activator>
该 bean 的处理方式与声明的任何内部 bean 相同,并且未在应用程序上下文中注册。
如果您希望以其他方式访问此 bean,请在顶层使用 an 声明它,并改用该属性。
有关更多信息,请参阅 Spring 文档。id
ref
终端节点角色
从版本 4.2 开始,可以将终端节点分配给角色。
角色允许将终端节点作为一个组来启动和停止。
这在使用 leadership election 时特别有用,其中可以分别在授予或撤销 leadership 时启动或停止一组端点。
为此,框架在应用程序上下文中注册了一个名为 .
每当需要控制生命周期时,都可以注入这个 bean 或:SmartLifecycleRoleController
IntegrationContextUtils.INTEGRATION_LIFECYCLE_ROLE_CONTROLLER
@Autowired
<bean class="com.some.project.SomeLifecycleControl">
<property name="roleController" ref="integrationLifecycleRoleController"/>
</bean>
您可以使用 XML、Java 配置或以编程方式将终端节点分配给角色。 以下示例演示如何使用 XML 配置终结点角色:
<int:inbound-channel-adapter id="ica" channel="someChannel" expression="'foo'" role="cluster"
auto-startup="false">
<int:poller fixed-rate="60000" />
</int:inbound-channel-adapter>
以下示例说明如何为在 Java 中创建的 bean 配置端点角色:
@Bean
@ServiceActivator(inputChannel = "sendAsyncChannel", autoStartup="false")
@Role("cluster")
public MessageHandler sendAsyncHandler() {
return // some MessageHandler
}
以下示例演示如何在 Java 中的方法上配置终结点角色:
@Payload("#args[0].toLowerCase()")
@Role("cluster")
public String handle(String payload) {
return payload.toUpperCase();
}
以下示例演示如何在 Java 中使用 配置终结点角色:SmartLifecycleRoleController
@Autowired
private SmartLifecycleRoleController roleController;
...
this.roleController.addSmartLifeCycleToRole("cluster", someEndpoint);
...
以下示例显示了如何使用 Java 中的端点角色:IntegrationFlow
IntegrationFlow flow -> flow
.handle(..., e -> e.role("cluster"));
其中每个选项都会将终端节点添加到角色中。cluster
Invoking 和相应的方法将启动和停止端点。roleController.startLifecyclesInRole("cluster")
stop…
任何实现的对象都可以以编程方式添加 — 而不仅仅是端点。SmartLifecycle |
实现 和 当授予或撤销领导权时(当某些 bean 发布或 时分别)自动启动和停止其配置的对象。SmartLifecycleRoleController
ApplicationListener<AbstractLeaderEvent>
SmartLifecycle
OnGrantedEvent
OnRevokedEvent
当使用领导选举来启动和停止组件时,将 XML 属性( bean 属性)设置为 ,以便应用程序上下文在上下文初始化期间不会启动组件。auto-startup autoStartup false |
从版本 4.3.8 开始,提供了几种 status 方法:SmartLifecycleRoleController
public Collection<String> getRoles() (1)
public boolean allEndpointsRunning(String role) (2)
public boolean noEndpointsRunning(String role) (3)
public Map<String, Boolean> getEndpointsRunningStatus(String role) (4)
1 | 返回正在管理的角色的列表。 |
2 | 如果角色中的所有终端节点都在运行,则返回。true |
3 | 如果角色中的任何终端节点均未运行,则返回此结果。true |
4 | 返回 的映射。
组件名称通常是 bean 名称。component name : running status |
领导层活动处理
终端节点组可以根据授予或撤销的领导权分别启动和停止。 这在共享资源必须仅由单个实例使用的集群场景中非常有用。 这方面的一个例子是轮询共享目录的文件入站通道适配器。 (请参阅读取文件)。
为了参与领导者选举并在当选领导者、撤销领导者或未能获得成为领导者的资源时收到通知,应用程序会在应用程序上下文中创建一个称为“领导者发起方”的组件。
通常,领导发起方是 ,因此它在上下文启动时启动(可选),然后在领导层发生变化时发布通知。
您还可以通过将 设置为 (从版本 5.0 开始) 来接收失败通知,以便在发生故障时希望采取特定操作。
按照惯例,您应该提供接收回调的 a。
您还可以通过框架提供的对象撤销领导权。
您的代码还可以侦听实例(和 的超类)并相应地响应(例如,通过使用 a )。
事件包含对对象的引用。
下面的清单显示了接口的定义:SmartLifecycle
publishFailedEvents
true
Candidate
Context
o.s.i.leader.event.AbstractLeaderEvent
OnGrantedEvent
OnRevokedEvent
SmartLifecycleRoleController
Context
Context
public interface Context {
boolean isLeader();
void yield();
String getRole();
}
从版本 5.0.6 开始,上下文提供对应聘者角色的引用。
Spring 集成提供了基于抽象的 leader initiator 的基本实现。
要使用它,你需要创建一个实例作为 Bean,如下例所示:LockRegistry
@Bean
public LockRegistryLeaderInitiator leaderInitiator(LockRegistry locks) {
return new LockRegistryLeaderInitiator(locks);
}
如果 lock registry 正确实现,则最多只有一个 leader。
如果 lock registry 还提供在过期或损坏时引发异常(理想情况下)的锁,则无领导期的持续时间可以尽可能短,以锁定实现中的固有延迟所允许的最短。
默认情况下,该属性会添加一些额外的延迟,以防止在锁不完美(更常见)的情况下出现 CPU 匮乏,并且只有在尝试再次获取锁时才知道它们已过期。InterruptedException
busyWaitMillis
有关领导层选举和使用 Zookeeper 的事件的更多信息,请参阅 Zookeeper 领导层事件处理。 有关领导层选举和使用 Hazelcast 的事件的更多信息,请参阅 Hazelcast 领导层事件处理。
消息网关
网关隐藏了 Spring 集成提供的消息传递 API。 它允许你的应用程序的业务逻辑不知道 Spring 集成 API。 通过使用通用 Gateway,您的代码仅与一个简单的接互。
输入GatewayProxyFactoryBean
如前所述,不依赖于 Spring 集成 API (包括 gateway 类)就太好了。
出于这个原因, Spring 集成提供了 ,它为任何接口生成代理,并在内部调用如下所示的网关方法。
通过使用依赖关系注入,您可以向业务方法公开接口。GatewayProxyFactoryBean
下面的示例展示了一个可用于与 Spring 集成交互的接口:
package org.cafeteria;
public interface Cafe {
void placeOrder(Order order);
}
Gateway XML 命名空间支持
还提供了命名空间支持。 它允许您将接口配置为服务,如下例所示:
<int:gateway id="cafeService"
service-interface="org.cafeteria.Cafe"
default-request-channel="requestChannel"
default-reply-timeout="10000"
default-reply-channel="replyChannel"/>
定义了这个配置后,现在可以注入到其他 bean 中,并且在接口的代理实例上调用方法的代码不知道 Spring 集成 API。
请参阅 “Samples” 附录 以获取使用 元素的示例(在 Cafe 演示中)。cafeService
Cafe
gateway
上述配置中的默认值将应用于网关接口上的所有方法。 如果未指定回复超时,则调用线程将等待回复 30 秒。 请参阅没有响应到达时的网关行为。
可以覆盖单个方法的默认值。 请参阅带有注释和 XML 的网关配置。
设置默认回复通道
通常,您无需指定 ,因为 Gateway 会自动创建一个临时的匿名回复通道,并在其中侦听回复。
但是,在某些情况下,可能会提示您定义 (或使用适配器网关,例如 HTTP、JMS 等)。default-reply-channel
default-reply-channel
reply-channel
对于一些背景,我们简要讨论了 gateway 的一些内部工作原理。
网关创建一个临时的点对点回复通道。
它是匿名的,并添加到邮件报头中,名称为 。
当提供显式(使用远程适配器网关)时,你可以指向一个发布-订阅通道,之所以这样命名,是因为你可以向其添加多个订阅者。
在内部, Spring Integration 在临时和显式定义之间创建了一个桥梁。replyChannel
default-reply-channel
reply-channel
replyChannel
default-reply-channel
假设您希望您的回复不仅发送到网关,还发送到其他某个使用者。 在这种情况下,您需要两样东西:
-
您可以订阅的命名频道
-
该通道将成为 publish-subscribe-channel
网关使用的默认策略不能满足这些需求,因为添加到 Headers 的回复通道是匿名的和点对点的。
这意味着其他订户无法获得该消息的句柄,即使可以,该通道也具有点对点行为,因此只有一个订户可以获取该消息。
通过定义 a,您可以指向您选择的通道。
在本例中,这是一个 .
网关会创建一个桥,从它到存储在 header 中的临时匿名回复通道。default-reply-channel
publish-subscribe-channel
您可能还希望显式提供一个回复通道,以便通过侦听器(例如,wiretap)进行监视或审计。 要配置通道拦截器,您需要一个命名通道。
从版本 5.4 开始,当 gateway method return type 为 时,如果未明确提供此类 Headers,则框架会将 Headers 填充为 Bean 引用。
这允许丢弃来自下游流的任何可能的回复,从而满足单向网关协定。void replyChannel nullChannel |
带有注释和 XML 的网关配置
请考虑以下示例,该示例通过添加 annotation 对前面的接口示例进行了扩展:Cafe
@Gateway
public interface Cafe {
@Gateway(requestChannel="orders")
void placeOrder(Order order);
}
该注释允许您添加解释为消息标头的值,如下例所示:@Header
public interface FileWriter {
@Gateway(requestChannel="filesOut")
void write(byte[] content, @Header(FileHeaders.FILENAME) String filename);
}
如果您更喜欢使用 XML 方法来配置网关方法,则可以将元素添加到网关配置中,如下例所示:method
<int:gateway id="myGateway" service-interface="org.foo.bar.TestGateway"
default-request-channel="inputC">
<int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
<int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
<int:method name="echoUpperCase" request-channel="inputB"/>
<int:method name="echoViaDefault"/>
</int:gateway>
您还可以使用 XML 为每个方法调用提供单独的标头。
如果要设置的 Headers 本质上是静态的,并且您不想通过使用 annotation 将它们嵌入到网关的方法签名中,这可能很有用。
例如,在贷款经纪人示例中,我们希望根据发起的请求类型(单个报价或所有报价)来影响贷款报价的聚合方式。
通过评估调用的网关方法来确定请求的类型,尽管可能,但会违反关注点分离范式(该方法是一个 Java 工件)。
但是,在消息标头中表达您的意图(元信息)在消息传递架构中是很自然的。
以下示例演示如何为两种方法中的每一种方法添加不同的消息标头:@Header
<int:gateway id="loanBrokerGateway"
service-interface="org.springframework.integration.loanbroker.LoanBrokerGateway">
<int:method name="getLoanQuote" request-channel="loanBrokerPreProcessingChannel">
<int:header name="RESPONSE_TYPE" value="BEST"/>
</int:method>
<int:method name="getAllLoanQuotes" request-channel="loanBrokerPreProcessingChannel">
<int:header name="RESPONSE_TYPE" value="ALL"/>
</int:method>
</int:gateway>
在前面的示例中,根据网关的方法为 'RESPONSE_TYPE' 标头设置了不同的值。
例如,如果指定 in 和 in 注释,则注释值优先。requestChannel <int:method/> @Gateway |
如果在 XML 中指定了无参数网关,并且接口方法同时具有 a 和 annotation(元素中带有 a 或 a),则忽略该值。@Payload @Gateway payloadExpression payload-expression <int:method/> @Payload |
表达式和 “global” 标头
元素 support 作为 .
计算 SPEL 表达式以确定 header 的值。
从版本 5.2 开始,评估上下文的对象是 with and 访问器。
例如,如果您希望对简单方法名称进行路由,则可以使用以下表达式添加标头:。<header/>
expression
value
#root
MethodArgsHolder
getMethod()
getArgs()
method.name
这是不可序列化的。
如果稍后序列化消息,则表达式为 的标头将丢失。
因此,您可能希望使用 OR 在这些情况下。
该方法提供方法的表示形式,包括参数和返回类型。java.reflect.Method method method.name method.toString() toString() String |
从版本 3.0 开始,可以定义元素以向网关生成的所有消息添加 Headers,而不管调用的方法如何。
为方法定义的特定标头优先于默认标头。
在此处为方法定义的特定标头将覆盖服务接口中的任何注释。
但是,默认标头不会覆盖服务接口中的任何注释。<default-header/>
@Header
@Header
网关现在还支持 ,该 API 适用于所有方法(除非被覆盖)。default-payload-expression
将方法参数映射到消息
使用上一节中的配置技术可以控制如何将方法参数映射到消息元素(有效负载和 Headers)。 如果未使用显式配置,则使用某些约定来执行映射。 在某些情况下,这些约定无法确定哪个参数是有效负载,哪些参数应该映射到 headers。 请考虑以下示例:
public String send1(Object thing1, Map thing2);
public String send2(Map thing1, Map thing2);
在第一种情况下,约定是将第一个参数映射到有效负载(只要它不是 a ),第二个参数的内容成为 headers。Map
在第二种情况下(或第一种情况,当 parameter 的参数为 a 时),框架无法确定哪个参数应该是有效负载。
因此,映射失败。
这通常可以使用 、 注释 或 注释 来解决。thing1
Map
payload-expression
@Payload
@Headers
或者(每当约定被打破时),您可以承担将方法调用映射到消息的全部责任。
为此,请实现 an 并使用 属性将其提供给 。
映射器映射 a ,这是一个包装实例的简单类,而 an 包含参数。
提供自定义映射器时,不允许在网关上使用属性和元素。
同样,任何元素上都不允许使用 attribute 和 elements。MethodArgsMessageMapper
<gateway/>
mapper
MethodArgsHolder
java.reflect.Method
Object[]
default-payload-expression
<default-header/>
payload-expression
<header/>
<method/>
映射方法参数
以下示例显示了如何将方法参数映射到消息,并显示了无效配置的一些示例:
public interface MyGateway {
void payloadAndHeaderMapWithoutAnnotations(String s, Map<String, Object> map);
void payloadAndHeaderMapWithAnnotations(@Payload String s, @Headers Map<String, Object> map);
void headerValuesAndPayloadWithAnnotations(@Header("k1") String x, @Payload String s, @Header("k2") String y);
void mapOnly(Map<String, Object> map); // the payload is the map and no custom headers are added
void twoMapsAndOneAnnotatedWithPayload(@Payload Map<String, Object> payload, Map<String, Object> headers);
@Payload("args[0] + args[1] + '!'")
void payloadAnnotationAtMethodLevel(String a, String b);
@Payload("@someBean.exclaim(args[0])")
void payloadAnnotationAtMethodLevelUsingBeanResolver(String s);
void payloadAnnotationWithExpression(@Payload("toUpperCase()") String s);
void payloadAnnotationWithExpressionUsingBeanResolver(@Payload("@someBean.sum(#this)") String s); // (1)
// invalid
void twoMapsWithoutAnnotations(Map<String, Object> m1, Map<String, Object> m2);
// invalid
void twoPayloads(@Payload String s1, @Payload String s2);
// invalid
void payloadAndHeaderAnnotationsOnSameParameter(@Payload @Header("x") String s);
// invalid
void payloadAndHeadersAnnotationsOnSameParameter(@Payload @Headers Map<String, Object> map);
}
1 | 请注意,在此示例中,SPEL 变量 , 引用参数 — 在本例中为 的值。#this s |
XML 等效项看起来略有不同,因为 method 参数没有上下文。
但是,表达式可以通过使用根对象的属性来引用方法参数(有关更多信息,请参阅表达式和“全局”标头),如下例所示:#this
args
MethodArgsHolder
<int:gateway id="myGateway" service-interface="org.something.MyGateway">
<int:method name="send1" payload-expression="args[0] + 'thing2'"/>
<int:method name="send2" payload-expression="@someBean.sum(args[0])"/>
<int:method name="send3" payload-expression="method"/>
<int:method name="send4">
<int:header name="thing1" expression="args[2].toUpperCase()"/>
</int:method>
</int:gateway>
@MessagingGateway
注解
从版本 4.0 开始,网关服务接口可以使用注释进行标记,而不需要定义 xml 元素进行配置。
以下一对示例比较了配置同一网关的两种方法:@MessagingGateway
<gateway />
<int:gateway id="myGateway" service-interface="org.something.TestGateway"
default-request-channel="inputC">
<int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
<int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
<int:method name="echoUpperCase" request-channel="inputB">
<int:header name="thing1" value="thing2"/>
</int:method>
<int:method name="echoViaDefault"/>
</int:gateway>
@MessagingGateway(name = "myGateway", defaultRequestChannel = "inputC",
defaultHeaders = @GatewayHeader(name = "calledMethod",
expression="#gatewayMethod.name"))
public interface TestGateway {
@Gateway(requestChannel = "inputA", replyTimeout = 2, requestTimeout = 200)
String echo(String payload);
@Gateway(requestChannel = "inputB", headers = @GatewayHeader(name = "thing1", value="thing2"))
String echoUpperCase(String payload);
String echoViaDefault(String payload);
}
与 XML 版本类似,当 Spring 集成在组件扫描期间发现这些注释时,它会使用其消息传递基础结构创建实现。
要执行此扫描并在应用程序上下文中注册 ,请将注释添加到类中。
标准基础结构不处理接口。
因此,我们引入了自定义 logic 来查找接口上的 annotation 并为它们注册实例。
另请参阅 注释支持。proxy BeanDefinition @IntegrationComponentScan @Configuration @ComponentScan @IntegrationComponentScan @MessagingGateway GatewayProxyFactoryBean |
除了注释之外,您还可以使用注释标记服务接口,以避免创建 Bean(如果此类配置文件未处于活动状态)。@MessagingGateway
@Profile
从版本 6.0 开始,带有 the 的接口也可以用相应 configuration logic 的 Comments 来标记,就像任何 Spring 定义一样。@MessagingGateway
@Primary
@Component
从版本 6.0 开始,可以在标准 Spring 配置中使用接口。
这可以用作 or 手动 bean 定义的替代方法。@MessagingGateway
@Import
@IntegrationComponentScan
AnnotationGatewayProxyFactoryBean
该使用 since 版本进行元注释,并且该属性实质上是别名为 .
这样,网关代理的 bean 名称生成策略将与扫描和导入组件的标准 Spring 注释配置重新保持一致。
可以通过 an 或 as 属性全局覆盖 default。@MessagingGateway
@MessageEndpoint
6.0
name()
@Compnent.value()
AnnotationBeanNameGenerator
AnnotationConfigUtils.CONFIGURATION_BEAN_NAME_GENERATOR
@IntegrationComponentScan.nameGenerator()
如果您没有 XML 配置,那么至少一个类都需要注释。
有关更多信息,请参阅配置和@EnableIntegration 。@EnableIntegration @Configuration |
调用无参数方法
在没有任何参数的 Gateway 接口上调用方法时,默认行为是从 a 接收 a 。Message
PollableChannel
但是,有时您可能希望触发无参数方法,以便可以与下游不需要用户提供的参数的其他组件进行交互,例如触发无参数 SQL 调用或存储过程。
要实现发送和接收语义,您必须提供有效负载。
要生成有效负载,接口上的方法参数不是必需的。
您可以在元素上使用 XML 中的注释或属性。
以下列表包括有效负载的几个示例:@Payload
payload-expression
method
-
文本字符串
-
#gatewayMethod.name
-
新的 java.util.Date()
-
@someBean.someMethod() 的返回值
以下示例演示如何使用注释:@Payload
public interface Cafe {
@Payload("new java.util.Date()")
List<Order> retrieveOpenOrders();
}
您也可以使用注释。@Gateway
public interface Cafe {
@Gateway(payloadExpression = "new java.util.Date()")
List<Order> retrieveOpenOrders();
}
如果两个 Comments 都存在(并且提供了 ),则 win。payloadExpression @Gateway |
另请参阅使用注释和 XML 的网关配置。
如果方法没有参数和返回值,但包含有效负载表达式,则将其视为仅发送操作。
调用方法default
网关代理的接口也可能具有方法,从版本 5.3 开始,框架将 a 注入到代理中,以便使用方法而不是代理来调用方法。
JDK 中的接口(例如 )仍可用于网关代理,但由于针对 JDK 类进行实例化的内部 Java 安全原因,无法调用其方法。
这些方法也可以在方法上或注释或 XML 组件上使用显式注释进行代理(丢失它们的实现逻辑,同时恢复以前的网关代理行为)。default
DefaultMethodInvokingMethodInterceptor
default
java.lang.invoke.MethodHandle
java.util.function.Function
default
MethodHandles.Lookup
@Gateway
proxyDefaultMethods
@MessagingGateway
<gateway>
错误处理
网关调用可能会导致错误。 默认情况下,在网关的方法调用时,下游发生的任何错误都会“按原样”重新引发。 例如,请考虑以下简单流程:
gateway -> service-activator
如果服务激活器调用的服务抛出 a(例如),框架会将其包装在 a 中,并将传递给服务激活器的消息附加到属性中。
因此,框架执行的任何日志记录都具有完整的失败上下文。
默认情况下,当网关捕获到异常时,将解包并抛出给调用方。
您可以在网关方法声明上配置子句,以匹配原因链中的特定异常类型。
例如,如果要捕获包含下游错误原因的所有消息收发信息的整体,则应使用类似于以下内容的网关方法:MyException
MessagingException
failedMessage
MyException
throws
MessagingException
public interface MyGateway {
void performProcess() throws MessagingException;
}
由于我们鼓励 POJO 编程,因此您可能不希望将调用方暴露给消息传递基础设施。
如果您的网关方法没有子句,则网关将遍历原因树,查找不是 .
如果未找到,框架将抛出 .
如果前面的讨论中有 和 your method 的原因,则网关会进一步解包该 API 并将其抛给调用方。throws
RuntimeException
MessagingException
MessagingException
MyException
SomeOtherException
throws SomeOtherException
当声明网关时 no ,将使用内部框架接口。service-interface
RequestReplyExchanger
请考虑以下示例:
public interface RequestReplyExchanger {
Message<?> exchange(Message<?> request) throws MessagingException;
}
在版本 5.0 之前,此方法没有子句,因此,异常被解包。
如果你使用这个接口,并且想要恢复以前的 unwrap 行为,请使用 custom instead 或访问 yourself.exchange
throws
service-interface
cause
MessagingException
但是,您可能希望记录错误而不是传播错误,或者您可能希望将异常视为有效回复(通过将其映射到符合调用方理解的某个“错误消息”协定的消息)。
为了实现这一点,网关通过包含对属性的支持来为专用于错误的消息通道提供支持。
在以下示例中,“transformer”从 中创建回复:error-channel
Message
Exception
<int:gateway id="sampleGateway"
default-request-channel="gatewayChannel"
service-interface="foo.bar.SimpleGateway"
error-channel="exceptionTransformationChannel"/>
<int:transformer input-channel="exceptionTransformationChannel"
ref="exceptionTransformer" method="createErrorResponse"/>
它可以是一个简单的 POJO,它知道如何创建预期的错误响应对象。
这将成为发送回给调用方的有效负载。
如有必要,您可以在这样的 “错误流” 中做很多更复杂的事情。
它可能涉及路由器(包括 Spring Integration 的)、过滤器等。
然而,大多数时候,一个简单的 “transformer” 就足够了。exceptionTransformer
ErrorMessageExceptionTypeRouter
或者,您可能希望仅记录异常(或将其异步发送到某个位置)。
如果您提供单向流,则不会将任何内容发送回调用方。
如果要完全禁止显示异常,则可以提供对全局的引用(本质上是一种方法)。
最后,如上所述,如果定义了 no,则异常将照常传播。nullChannel
/dev/null
error-channel
使用注释(请参阅 )时,可以使用属性。@MessagingGateway
@MessagingGateway
AnnotationerrorChannel
从版本 5.0 开始,当您使用具有返回类型(单向流)的网关方法时,引用(如果提供)将填充到每条已发送消息的标准标头中。
此功能允许基于标准配置(或 a )的下游异步流覆盖默认的全局异常发送行为。
以前,您必须手动指定带有注释或元素的标题。
对于具有异步流的方法,该属性被忽略。
相反,错误消息已发送到默认 。void
error-channel
errorChannel
ExecutorChannel
QueueChannel
errorChannel
errorChannel
@GatewayHeader
<header>
error-channel
void
errorChannel
通过简单的 POJI 网关公开消息传递系统是有好处的,但“隐藏”底层消息传递系统的现实确实是有代价的,因此您应该考虑某些事项。
我们希望我们的 Java 方法尽快返回,而不是在调用者等待它返回时无限期挂起(无论是 void、返回值还是引发的 Exception)。
当常规方法用作消息传递系统前面的代理时,我们必须考虑底层消息传递的潜在异步性质。
这意味着由网关启动的消息有可能被过滤器丢弃,并且永远不会到达负责生成回复的组件。
某些服务激活器方法可能会导致异常,因此不提供回复(因为我们不生成 null 消息)。
换句话说,多种情况都可能导致回复消息永远不会出现。
这在消息传递系统中是非常自然的。
但是,请考虑对网关方法的含义。
网关的方法输入参数被合并到消息中并发送到下游。
回复消息将转换为网关方法的返回值。
因此,您可能希望确保对于每个网关呼叫,始终有一条回复消息。
否则,如果设置为负值,则网关方法可能永远不会返回并无限期挂起。
处理这种情况的一种方法是使用异步网关(本节稍后将介绍)。
另一种处理方法是依赖默认值作为秒。
这样,网关的挂起时间不会超过指定的时间,如果超时已过,则返回 'null'。
最后,你可能要考虑在服务激活器上设置下游标志,例如 'requires-reply' 或过滤器上的 'throw-exceptions-on-rejection'。
本章的最后一节将更详细地讨论这些选项。reply-timeout reply-timeout 30 reply-timeout |
如果下游流返回 ,则其 (a ) 将被视为常规下游错误。
如果存在已配置,则会将其发送到错误流。
否则,有效负载将抛给网关的调用方。
同样,如果 上的错误流返回 an ,则会将其有效负载抛出给调用方。
这同样适用于具有有效负载的任何消息。
这在异步情况下非常有用,当您需要将 直接传播到调用方时。
为此,您可以返回 an (as the from some service) 或抛出它。
通常,即使使用异步流,框架也会负责将下游流引发的异常传播回网关。
TCP 客户端-服务器多路复用示例演示了将异常返回给调用方的两种技术。
它通过在 discard 流上使用 with(请参阅 聚合器和组超时)和回复来模拟等待线程的套接字 IO 错误。ErrorMessage payload Throwable error-channel error-channel ErrorMessage Throwable Exception Exception reply aggregator group-timeout MessagingTimeoutException |
网关超时
网关有两个超时属性:和 .
仅当通道可以阻塞(例如,已满的 bounded)时,请求超时才适用。
该值是网关等待回复或返回 的时间。
它默认为无穷大。requestTimeout
replyTimeout
QueueChannel
replyTimeout
null
超时可以设置为网关 ( 和 ) 或接口注释上所有方法的默认值。
单个方法可以覆盖这些默认值(在子元素中)或注解上。defaultRequestTimeout
defaultReplyTimeout
MessagingGateway
<method/>
@Gateway
从版本 5.0 开始,超时可以定义为表达式,如下例所示:
@Gateway(payloadExpression = "args[0]", requestChannel = "someChannel",
requestTimeoutExpression = "args[1]", replyTimeoutExpression = "args[2]")
String lateReply(String payload, long requestTimeout, long replyTimeout);
评估上下文具有 (用于引用其他 bean),并且对象的 array 属性可用。
有关此根对象的更多信息,请参阅表达式和 “Global” Headers。
使用 XML 进行配置时,超时属性可以是长值或 SPEL 表达式,如下例所示:BeanResolver
@someBean
args
#root
<method name="someMethod" request-channel="someRequestChannel"
payload-expression="args[0]"
request-timeout="1000"
reply-timeout="args[1]">
</method>
异步网关
作为一种模式,消息传递网关提供了一种很好的方法来隐藏特定于消息传递的代码,同时仍然公开消息传递系统的全部功能。
如前所述,它提供了一种通过服务接口公开代理的便捷方法,使您可以基于 POJO 访问消息传递系统(基于您自己的域中的对象、原始语/字符串或其他对象)。
但是,当网关通过返回值的简单 POJO 方法公开时,这意味着对于每个请求消息(在调用方法时生成),必须有一个回复消息(在方法返回时生成)。
由于消息传递系统本质上是异步的,因此您可能无法始终保证 “对于每个请求,总会有一个回复” 的合同。Spring Integration 2.0 引入了对异步网关的支持,当您可能不知道是否需要回复或回复到达需要多长时间时,它提供了一种方便的方式来启动流。GatewayProxyFactoryBean
为了处理这些类型的场景, Spring 集成使用实例来支持异步网关。java.util.concurrent.Future
在 XML 配置中,没有任何变化,您仍然以与定义常规网关相同的方式定义异步网关,如下例所示:
<int:gateway id="mathService"
service-interface="org.springframework.integration.sample.gateway.futures.MathServiceGateway"
default-request-channel="requestChannel"/>
但是,网关接口(服务接口)略有不同,如下所示:
public interface MathServiceGateway {
Future<Integer> multiplyByTwo(int i);
}
如前面的示例所示,网关方法的返回类型是 .
当看到网关方法的返回类型为 a 时,它立即使用 切换到异步模式。
这就是差异的程度。
对此类方法的调用始终立即返回实例。
然后,您可以按照自己的节奏与 交互以获取结果、取消等。
此外,与实例的任何其他用法一样,调用 可能会显示超时、执行异常等。
以下示例演示如何使用从异步网关返回的 a:Future
GatewayProxyFactoryBean
Future
AsyncTaskExecutor
Future
Future
Future
get()
Future
MathServiceGateway mathService = ac.getBean("mathService", MathServiceGateway.class);
Future<Integer> result = mathService.multiplyByTwo(number);
// do something else here since the reply might take a moment
int finalResult = result.get(1000, TimeUnit.SECONDS);
有关更详细的示例,请参阅 Spring 集成示例中的 async-gateway 示例。
AsyncTaskExecutor
默认情况下,在为返回类型为 .
但是,元素配置中的属性允许您提供对 Spring 应用程序上下文中 available 的任何实现的引用。GatewayProxyFactoryBean
org.springframework.core.task.SimpleAsyncTaskExecutor
AsyncInvocationTask
Future
async-executor
<gateway/>
java.util.concurrent.Executor
(默认)同时支持 和 return 类型。
请参阅 CompletableFuture
。
即使有默认的执行程序,提供外部执行程序通常也很有用,这样你就可以在日志中识别其线程(当使用 XML 时,线程名称基于执行程序的 Bean 名称),如下例所示:SimpleAsyncTaskExecutor
Future
CompletableFuture
@Bean
public AsyncTaskExecutor exec() {
SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
simpleAsyncTaskExecutor.setThreadNamePrefix("exec-");
return simpleAsyncTaskExecutor;
}
@MessagingGateway(asyncExecutor = "exec")
public interface ExecGateway {
@Gateway(requestChannel = "gatewayChannel")
Future<?> doAsync(String foo);
}
如果您希望返回不同的实施,则可以提供自定义执行程序或完全禁用执行程序,并从下游流返回回复消息有效负载。
要禁用执行程序,请在 (通过使用) 中将其设置为 。
使用 XML 配置网关时,请使用 .
使用注释进行配置时,请使用类似于以下内容的代码:Future
Future
null
GatewayProxyFactoryBean
setAsyncTaskExecutor(null)
async-executor=""
@MessagingGateway
@MessagingGateway(asyncExecutor = AnnotationConstants.NULL)
public interface NoExecGateway {
@Gateway(requestChannel = "gatewayChannel")
Future<?> doAsync(String foo);
}
如果返回类型是特定的具体实现或配置的执行程序不支持的其他子接口,则流将在调用方的线程上运行,并且流必须在回复消息有效负载中返回所需的类型。Future |
CompletableFuture
从版本 4.2 开始,网关方法现在可以返回 .
返回此类型时有两种操作模式:CompletableFuture<?>
-
当提供了异步执行程序并且返回类型恰好是(不是子类)时,框架会在执行程序上运行任务,并立即将 a 返回给调用者。 用于创造未来。
CompletableFuture
CompletableFuture
CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor)
-
当异步执行程序显式设置为 且返回类型为 或 返回类型 是 的子类 时,将在调用方的线程上调用流。 在这种情况下,下游流应返回适当类型的 a。
null
CompletableFuture
CompletableFuture
CompletableFuture
从 Spring Framework 开始,已弃用。
现在建议迁移到 提供类似处理功能的 。org.springframework.util.concurrent.ListenableFuture 6.0 CompletableFuture |
使用场景
在以下场景中,调用方线程立即返回 ,当下游流回复网关(带有对象)时,该响应完成。CompletableFuture<Invoice>
Invoice
CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="something.Service" default-request-channel="orders" />
在以下场景中,当下游流将其作为对网关的回复的负载提供时,调用方线程将返回 a。
当发票准备好时,必须完成其他一些流程。CompletableFuture<Invoice>
CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders"
async-executor="" />
在以下场景中,当下游流将其作为对网关的回复的负载提供时,调用方线程将返回 a。
当发票准备好时,必须完成其他一些流程。
如果启用了日志记录,则会发出一个日志条目,指示异步执行程序不能用于此方案。CompletableFuture<Invoice>
DEBUG
MyCompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders" />
CompletableFuture
实例可用于对回复执行其他操作,如下例所示:
CompletableFuture<String> process(String data);
...
CompletableFuture result = process("foo")
.thenApply(t -> t.toUpperCase());
...
String out = result.get(10, TimeUnit.SECONDS);
反应器Mono
从版本 5.0 开始,允许使用 Mono<T>
返回类型将 Project Reactor 与网关接口方法一起使用。
内部包裹在 .GatewayProxyFactoryBean
AsyncInvocationTask
Mono.fromCallable()
A 可用于稍后检索结果(类似于 a ),或者您可以通过在结果返回到网关时调用 your 来与调度程序一起使用它。Mono
Future<?>
Consumer
框架不会立即刷新 。
因此,底层消息流不会在网关方法返回之前启动(就像任务一样)。
流在订阅时启动。
或者,当 与整个 .
以下示例显示了如何使用 Project Reactor 创建网关:Mono Future<?> Executor Mono Mono subscribe() Flux |
@MessagingGateway
public interface TestGateway {
@Gateway(requestChannel = "multiplyChannel")
Mono<Integer> multiply(Integer value);
}
@ServiceActivator(inputChannel = "multiplyChannel")
public Integer multiply(Integer value) {
return value * 2;
}
其中,这样的网关可以用于处理 OF 数据的某些服务中:Flux
@Autowired
TestGateway testGateway;
public void hadnleFlux() {
Flux.just("1", "2", "3", "4", "5")
.map(Integer::parseInt)
.flatMap(this.testGateway::multiply)
.collectList()
.subscribe(System.out::println);
}
使用 Project Reactor 的另一个示例是一个简单的回调场景,如下例所示:
Mono<Invoice> mono = service.process(myOrder);
mono.subscribe(invoice -> handleInvoice(invoice));
调用线程将继续,并在流完成时被调用。handleInvoice()
如需了解详情,另请参阅 Kotlin 协程。
返回异步类型的下游流
如上面的 AsyncTaskExecutor
部分所述,如果你希望某个下游组件返回带有异步有效负载(、 、 和其他)的消息,你必须显式地将异步 executor 设置为 (或使用 XML 配置时)。
然后,在调用方线程上调用该流,稍后可以检索结果。Future
Mono
null
""
异步返回类型void
消息网关方法可以按如下方式声明:
@MessagingGateway
public interface MyGateway {
@Gateway(requestChannel = "sendAsyncChannel")
@Async
void sendAsync(String payload);
}
但是下游异常不会传播回调用方。
为了确保下游流调用和异常传播到调用方的异步行为,从版本 6.0 开始,框架提供了对 和 return 类型的支持。
该用例类似于前面描述的普通返回类型的发送和忘记行为,但不同的是,流执行是异步发生的,并且根据操作结果返回 (or ) 以 or 例外完成。Future<Void>
Mono<Void>
void
Future
Mono
null
send
如果 is exact downstream flow reply,则必须将网关的选项设置为 null(对于配置),并在生产者线程上执行该部分。
回复 1 取决于下游流配置。
这样,目标应用程序就可以正确生成回复。
用例已经超出了框架线程控制范围,因此设置为 null 没有意义。
作为 request-reply 网关操作的结果,必须将其配置为网关方法的返回类型。Future<Void> asyncExecutor AnnotationConstants.NULL @MessagingGateway send Future<Void> Mono asyncExecutor Mono<Void> Mono<?> |
未到达响应时的网关行为
如前所述,网关提供了一种通过 POJO 方法调用与消息传递系统交互的便捷方式。 但是,通常预期始终返回的典型方法调用(即使有 Exception)可能并不总是一对一地映射到消息交换(例如,回复消息可能未到达 — 相当于方法未返回)。
本节的其余部分介绍各种场景以及如何使网关的行为更具可预测性。
可以配置某些属性以使同步网关行为更具可预测性,但其中一些属性可能并不总是像您预期的那样工作。
其中之一是 (在方法级别或网关级别)。
我们检查该属性,以了解它在各种场景中如何能够和不能影响 synchronous gateway 的行为。
我们研究了单线程场景(下游的所有组件都通过直接通道连接)和多线程场景(例如,在下游的某个地方,你可能有一个打破单线程边界的 pollable 或 executor 通道)。reply-timeout
default-reply-timeout
reply-timeout
长时间运行的流程下游
- Sync Gateway,单线程
-
如果下游组件仍在运行(可能是由于无限循环或服务缓慢),则设置 a 不起作用,并且网关方法调用不会返回,直到下游服务退出(通过返回或抛出异常)。
reply-timeout
- Sync Gateway,多线程
-
如果下游组件仍在多线程消息流中运行(可能是由于无限循环或服务缓慢),则设置 () 的效果是允许网关方法调用在达到超时后返回,因为在回复通道上轮询,等待消息,直到超时到期。 但是,如果在生成实际回复之前已达到超时,则可能导致网关方法返回 'null'。 您应该了解,回复消息(如果生成)是在网关方法调用可能返回后发送到回复通道的,因此您必须了解这一点并在设计流程时牢记这一点。
reply-timeout
GatewayProxyFactoryBean
下游组件返回 'null'
- Sync Gateway — 单线程
-
如果组件下游返回 'null' 并且 已配置为负值,则网关方法调用将无限期挂起,除非已在可能返回 'null' 的下游组件(例如,服务激活器)上设置了该属性。 在这种情况下,将引发异常并将其传播到网关。
reply-timeout
requires-reply
- Sync Gateway — 多线程
-
该行为与前一种情况相同。
下游组件返回签名为“void”,而网关方法签名为非 void
- Sync Gateway — 单线程
-
如果下游组件返回 'void' 并且 已配置为负值,则网关方法调用将无限期挂起。
reply-timeout
- Sync Gateway — 多线程
-
该行为与前一种情况相同。
下游组件导致运行时异常
- Sync Gateway — 单线程
-
如果组件下游引发运行时异常,则异常将通过错误消息传播回网关并重新引发。
- Sync Gateway — 多线程
-
该行为与前一种情况相同。
您应该了解,默认情况下,它是无界的。
因此,如果您将 设置为 负值,则网关方法调用可能会无限期挂起。
因此,为了确保您分析了您的流程,并且如果其中一种情况发生的可能性很小,您应该将属性设置为 “'safe'” 值。
默认情况下是秒。
更好的是,您可以将下游组件的属性设置为 'true' 以确保及时响应,就像下游组件在内部返回 null 时引发异常而产生的响应。
但是,您还应该意识到,在某些情况下(请参阅第一个)没有帮助。
这意味着分析您的消息流并决定何时使用同步网关而不是异步网关也很重要。
如前所述,后一种情况是定义返回实例的网关方法的问题。
然后,您可以保证收到该返回值,并且您可以对调用结果进行更精细的控制。
此外,在处理路由器时,您应该记住,如果路由器无法解析特定通道,则将属性设置为 'true' 会导致路由器抛出异常。
同样,在处理 Filter 时,您可以设置该属性。
在这两种情况下,生成的流的行为类似于它包含具有 'requires-reply' 属性的服务激活器。
换句话说,它有助于确保网关方法调用的及时响应。reply-timeout reply-timeout reply-timeout 30 requires-reply reply-timeout Future resolution-required throw-exception-on-rejection |
您应该了解,计时器在线程返回到网关时启动,即当流完成或将消息传递给另一个线程时。 此时,调用线程开始等待回复。 如果流是完全同步的,则回复将立即可用。 对于异步流,线程将等待 this time。 |
请参阅 Java DSL 一章中的 IntegrationFlow
as Gateway,了解通过 定义网关的选项。IntegrationFlow
服务激活器
服务激活器是将任何 Spring 管理的对象连接到 Importing 通道的端点类型,以便它可以扮演服务的角色。
如果服务生成输出,则它还可能连接到输出通道。
或者,输出生成服务可能位于处理管道或消息流的末尾,在这种情况下,可以使用入站消息的标头。
如果未定义输出通道,则这是默认行为。
与此处描述的大多数配置选项一样,相同的行为实际上适用于大多数其他组件。replyChannel
服务激活器本质上是一个通用端点,用于使用输入消息(payload 和 headers)在某个对象上调用方法。
它的内部逻辑基于 a 可以是特定用例的任何可能实现,例如 、 、 等。
因此,本参考手册中提到的任何出站网关和出站通道适配器都应被视为此服务激活器终端节点的特定扩展;最后,它们都调用了某个对象的方法。MessageHandler
DefaultMessageSplitter
AggregatingMessageHandler
SftpMessageHandler
JpaOutboundGateway
配置 Service Activator
使用Java和注释配置,只需用注释标记相应的服务方法就足够了 - 当消息从输入通道消费时,框架会调用它:@ServiceActivator
public class SomeService {
@ServiceActivator(inputChannel = "exampleChannel")
public void exampleHandler(SomeData payload) {
...
}
}
有关更多信息,请参阅注释支持。
对于 Java、Groovy 或 Kotlin DSL,an 的运算符表示服务激活器:.handle()
IntegrationFlow
@Bean
public IntegrationFlow someFlow() {
return IntegrationFlow
.from("exampleChannel")
.handle(someService, "exampleHandler")
.get();
}
@Bean
fun someFlow() =
integrationFlow("exampleChannel") {
handle(someService, "exampleHandler")
}
@Bean
someFlow() {
integrationFlow 'exampleChannel',
{
handle someService, 'exampleHandler'
}
}
有关这些 DSL 的更多信息,请参阅相应的章节:
要在使用 XML 配置时创建服务激活器,请使用带有 'input-channel' 和 'ref' 属性的 'service-activator' 元素,如下例所示:
<int:service-activator input-channel="exampleChannel" ref="exampleHandler"/>
前面的配置从 中选择满足消息收发要求之一的所有方法,如下所示:exampleHandler
-
注解
@ServiceActivator
-
是
public
-
如果
void
requiresReply == true
在运行时调用的目标方法按请求消息的类型为每条请求消息选择,或者作为类型的回退(如果目标类中存在此类方法)。payload
Message<?>
从版本 5.0 开始,对于所有不匹配的情况,可以将一个服务方法标记为 作为回退。
当使用内容类型转换时,这在转换后调用目标方法时非常有用。@org.springframework.integration.annotation.Default
要委托给任何对象的显式定义方法,您可以添加该属性,如下例所示:method
<int:service-activator input-channel="exampleChannel" ref="somePojo" method="someMethod"/>
在任一情况下,当 service 方法返回非 null 值时,终端节点都会尝试将回复消息发送到相应的回复通道。
要确定回复通道,它首先检查端点配置中是否提供了 an,如下例所示:output-channel
<int:service-activator input-channel="exampleChannel" output-channel="replyChannel"
ref="somePojo" method="someMethod"/>
如果该方法返回 result 且未定义 no,则框架将检查请求消息的标头值。
如果该值可用,则它会检查其类型。
如果是 ,则回复消息将发送到该通道。
如果它是 ,则端点会尝试将通道名称解析为通道实例。
如果无法解析通道,则抛出 a。
如果可以解决,则消息将发送到那里。
如果请求消息没有标头,并且对象是 a ,则查询其标头以获取目标目标。
这是 Spring Integration 中用于请求-回复消息传递的技术,它也是返回地址模式的一个例子。output-channel
replyChannel
MessageChannel
String
DestinationResolutionException
replyChannel
reply
Message
replyChannel
如果您的方法返回结果,并且您希望丢弃该结果并结束流,则应将 配置为发送到 。
为方便起见,框架注册了一个名称为 .
有关更多信息,请参阅 特殊通道 。output-channel
NullChannel
nullChannel
服务激活器是生成回复消息不需要的组件之一。
如果您的方法返回或具有返回类型,则服务激活器将在方法调用后退出,没有任何信号。
此行为可由选项控制,该选项也与使用 XML 命名空间进行配置时一样公开。
如果 flag 设置为 且方法返回 null,则引发 a。null
void
AbstractReplyProducingMessageHandler.requiresReply
requires-reply
true
ReplyRequiredException
service 方法中的参数可以是 message 或任意类型。
如果是后者,则假定它是消息有效负载,该有效负载从消息中提取并注入到服务方法中。
我们通常推荐这种方法,因为它在使用 Spring Integration 时遵循并促进了 POJO 模型。
参数也可以具有 or 注释,如 注释支持中所述。@Header
@Headers
service 方法不需要具有任何参数,这意味着您可以实现事件样式的服务激活器(其中您只关心 service 方法的调用),而不必担心消息的内容。 将其视为 null JMS 消息。 此类实现的一个示例用例是 input 通道上存储的消息的简单计数器或监视器。 |
从版本 4.1 开始,框架将消息属性( 和 )正确转换为 Java 8 POJO 方法参数,如下例所示:payload
headers
Optional
public class MyBean {
public String computeValue(Optional<String> payload,
@Header(value="foo", required=false) String foo1,
@Header(value="foo") Optional<String> foo2) {
if (payload.isPresent()) {
String value = payload.get();
...
}
else {
...
}
}
}
如果自定义服务激活器处理程序实现可以在其他定义中重用,我们通常建议使用属性。
但是,如果自定义服务激活器处理程序实现仅在 的单个定义中使用,则可以提供内部 Bean 定义,如下例所示:ref
<service-activator>
<service-activator>
<int:service-activator id="exampleServiceActivator" input-channel="inChannel"
output-channel = "outChannel" method="someMethod">
<beans:bean class="org.something.ExampleServiceActivator"/>
</int:service-activator>
不允许在同一配置中同时使用 attribute 和内部处理程序定义,因为它会创建不明确的条件并导致引发异常。ref <service-activator> |
如果该属性引用了扩展的 bean(例如框架本身提供的处理程序),则通过将输出通道直接注入处理程序来优化配置。
在这种情况下,每个都必须是单独的 bean 实例(或-scoped bean)或使用内部配置类型。
如果您无意中从多个 bean 引用了相同的消息处理程序,则会收到配置异常。ref AbstractMessageProducingHandler ref prototype <bean/> |
服务激活器和 Spring 表达式语言 (SpEL)
从 Spring Integration 2.0 开始,服务激活器也可以从 SPEL 中受益。
例如,你可以调用任何 Bean 方法,而无需在属性中指向 Bean 或将其作为内部 Bean 定义包括在内,如下所示:ref
<int:service-activator input-channel="in" output-channel="out"
expression="@accountService.processAccount(payload, headers.accountId)"/>
<bean id="accountService" class="thing1.thing2.Account"/>
在前面的配置中,我们不是通过使用 or 作为内部 Bean 来注入“accountService”,而是使用 SpEL 的表示法并调用一个采用与消息有效负载兼容的类型的方法。
我们还传递一个 header 值。
任何有效的 SPEL 表达式都可以根据消息中的任何内容进行评估。
对于简单的场景,如果所有 logic 都可以封装在这样的表达式中,则您的服务激活器不需要引用 bean,如下例所示:ref
@beanId
<int:service-activator input-channel="in" output-channel="out" expression="payload * 2"/>
在前面的配置中,我们的服务逻辑是将 payload 值乘以 2。 SpEL 让我们相对容易地处理它。
有关配置服务激活器的更多信息,请参阅 Java DSL 一章中的服务激活器和 .handle()
方法。
异步服务激活器
服务激活器由调用线程调用。
如果输入通道是 a 或 的 poller 线程,则这是上游线程。
如果服务返回 ,则默认操作是将其作为发送到输出(或回复)通道的消息的有效负载发送。
从版本 4.3 开始,您现在可以将属性设置为 (通过使用 Java 配置)。
如果服务返回 when this the attribute is set to ,则会立即释放调用线程,并在完成 future 的线程(从您的服务内部)上发送回复消息。
这对于使用 的长时间运行的服务特别有利,因为 Poller 线程被释放以执行框架中的其他服务。SubscribableChannel
PollableChannel
CompletableFuture<?>
async
true
setAsync(true)
CompletableFuture<?>
async
true
PollableChannel
如果服务使用 完成 future ,则会发生正常的错误处理。
An 将发送到消息标头(如果存在)。
否则,将 an 发送到 default(如果可用)。Exception
ErrorMessage
errorChannel
ErrorMessage
errorChannel
从版本 6.1 开始,如果 的输出通道配置为 ,则默认情况下将打开异步模式。
如果处理程序结果不是反应式 type 或 ,则无论 output channel 类型如何,都会发生常规的回复生成过程。AbstractMessageProducingHandler
ReactiveStreamsSubscribableChannel
CompletableFuture<?>
有关更多信息,另请参阅 Reactive Streams Support 。
Service Activator 和 Method 返回类型
service 方法可以返回任何类型,这些类型将成为回复消息有效负载。
在这种情况下,将创建一个新对象,并复制请求消息中的所有标头。
对于大多数 Spring 集成实现,当交互基于 POJO 方法调用时,其工作方式相同。Message<?>
MessageHandler
也可以从该方法返回一个完整的对象。
但是,请记住,与 transformer 不同,对于 Service Activator,如果返回的消息中尚不存在 headers,则将通过从请求消息中复制 headers 来修改此消息。
因此,如果您的 method 参数是 a,并且您复制了服务方法中的一些(但不是全部)现有标头,则它们将重新出现在回复消息中。
从回复消息中删除 Headers 不是 Service Activator 的责任,遵循松散耦合的原则,最好在集成流中添加 a。
或者,可以使用 Transformer 代替 Service Activator,但在这种情况下,当返回 full 时,该方法完全负责消息,包括复制请求消息标头(如果需要)。
您必须确保必须保留重要的框架标头(例如 , ),如果存在。Message<?>
Message<?>
HeaderFilter
Message<?>
replyChannel
errorChannel
延迟器
延迟器是一个简单的端点,它允许消息流延迟一定的时间间隔。
当消息延迟时,原始发件人不会阻止。
相反,延迟消息会使用 实例 of 进行计划,以便在延迟过后发送到输出通道。
这种方法即使对于相当长的延迟也是可扩展的,因为它不会导致大量阻塞的发送方线程。
相反,在典型情况下,线程池用于实际执行释放消息。
本节包含配置延迟器的几个示例。org.springframework.scheduling.TaskScheduler
配置 Delayer
该元素用于延迟两个消息通道之间的消息流。
与其他终端节点一样,您可以提供 'input-channel' 和 'output-channel' 属性,但延迟器还具有 'default-delay' 和 'expression' 属性(以及 'expression' 元素),用于确定每条消息应延迟的毫秒数。
以下示例将所有消息延迟 3 秒:<delayer>
<int:delayer id="delayer" input-channel="input"
default-delay="3000" output-channel="output"/>
如果需要确定每条消息的延迟,还可以使用'expression'属性提供 SPEL 表达式,如以下表达式所示:
@Bean
public IntegrationFlow flow() {
return IntegrationFlow.from("input")
.delay("delayer.messageGroupId", d -> d
.defaultDelay(3_000L)
.delayExpression("headers['delay']"))
.channel("output")
.get();
}
@Bean
fun flow() =
integrationFlow("input") {
delay("delayer.messageGroupId") {
defaultDelay(3000L)
delayExpression("headers['delay']")
}
channel("output")
}
@ServiceActivator(inputChannel = "input")
@Bean
public DelayHandler delayer() {
DelayHandler handler = new DelayHandler("delayer.messageGroupId");
handler.setDefaultDelay(3_000L);
handler.setDelayExpressionString("headers['delay']");
handler.setOutputChannelName("output");
return handler;
}
<int:delayer id="delayer" input-channel="input" output-channel="output"
default-delay="3000" expression="headers['delay']"/>
在前面的示例中,仅当表达式的计算结果为给定入站消息的 null 时,三秒延迟才适用。
如果您只想将延迟应用于具有表达式评估有效结果的消息,则可以使用 'default-delay' (默认值)。
对于延迟为 (或更小) 的任何消息,该消息将立即在调用线程上发送。0
0
XML 解析器使用消息组 ID。<beanName>.messageGroupId |
延迟处理程序支持表示以毫秒为单位的间隔的表达式计算结果(其方法生成可解析为 a 的值的任何结果)以及表示绝对时间的实例。
在第一种情况下,毫秒从当前时间开始计算(例如,值 of 会将消息从延迟器收到消息的时间开始延迟至少 5 秒)。
对于实例,消息在该对象表示的时间之前不会释放。
等于非正延迟或过去的 Date 的值不会导致延迟。
相反,它被直接发送到原始发送方线程上的 output 通道。
如果表达式评估结果不是 a 且无法解析为 a ,则应用默认延迟(如果有 — 默认值为 )。Object toString() Long java.util.Date 5000 Date Date Date Long 0 |
表达式计算可能会因各种原因(包括无效的表达式或其他条件)而引发计算异常。
默认情况下,此类异常将被忽略(尽管记录在 DEBUG 级别),并且延迟器会回退到默认延迟(如果有)。
您可以通过设置属性来修改此行为。
默认情况下,此属性设置为 ,并且延迟器行为如前所述。
但是,如果不希望忽略表达式计算异常并将其抛给延迟器的调用方,请将该属性设置为 .ignore-expression-failures true ignore-expression-failures false |
在前面的示例中,延迟表达式指定为 。
这是访问元素( implements )的 SPEL 语法。
它调用: .
对于简单的映射元素名称(不包含 '.'),您还可以使用 SPEL“点访问器”语法,其中前面显示的标头表达式可以指定为 .
但是,如果缺少标头,则会获得不同的结果。
在第一种情况下,表达式的计算结果为 。
第二个结果类似于以下内容:
因此,如果可能会省略标头,并且你想要回退到默认延迟,则使用索引器语法而不是点属性访问器语法通常更有效(并且建议使用),因为检测 null 比捕获异常更快。 |
延迟器委托给 Spring 抽象的实例。
延迟器使用的默认调度程序是 Spring 集成在启动时提供的实例。
请参阅配置 Task Scheduler。
如果要委托给不同的 scheduler,可以通过 delayer 元素的 'scheduler' 属性提供引用,如下例所示:TaskScheduler
ThreadPoolTaskScheduler
<int:delayer id="delayer" input-channel="input" output-channel="output"
expression="headers.delay"
scheduler="exampleTaskScheduler"/>
<task:scheduler id="exampleTaskScheduler" pool-size="3"/>
如果配置外部 ,则可以在此属性上设置。
它允许在应用程序关闭时成功完成已经处于执行状态(释放消息)的 'delay' 任务。
在 Spring Integration 2.2 之前,这个属性在元素上是可用的,因为可以在后台创建自己的调度程序。
从 2.2 开始,延迟器需要外部调度程序实例,并且已被删除。
您应该使用调度程序自己的配置。ThreadPoolTaskScheduler waitForTasksToCompleteOnShutdown = true <delayer> DelayHandler waitForTasksToCompleteOnShutdown |
ThreadPoolTaskScheduler 具有一个属性 ,该属性可以与 的某个实现一起注入。
此处理程序允许从发送延迟消息的计划任务的线程中处理 an 。
默认情况下,它使用 ,您可以在日志中看到堆栈跟踪。
您可能需要考虑使用 ,它将 发送到 ,无论是从失败消息的报头还是发送到默认的 。
此错误处理在事务回滚(如果存在)后执行。
请参阅 发布失败。errorHandler org.springframework.util.ErrorHandler Exception org.springframework.scheduling.support.TaskUtils$LoggingErrorHandler org.springframework.integration.channel.MessagePublishingErrorHandler ErrorMessage error-channel error-channel |
Delayer 和 Message Store
将延迟消息保存到提供的 .
('groupId' 基于元素所需的 'id' 属性。
在将延迟消息发送到 之前,计划任务会从 中删除该消息。
如果提供的是持久性的(例如 ),则它提供了在应用程序关闭时不丢失消息的能力。
应用程序启动后,将从 中的消息组读取消息,并根据消息的原始到达时间(如果延迟为数字)延迟重新安排这些消息。
对于延迟报头为 的邮件,在重新调度时使用。
如果延迟消息仍处于超过其 'delay' 状态,则会在启动后立即发送该消息。DelayHandler
MessageStore
<delayer>
MessageStore
DelayHandler
output-channel
MessageStore
JdbcMessageStore
DelayHandler
MessageStore
Date
Date
MessageStore
可以使用两个互斥元素之一来丰富 : 和 。
这些 AOP 建议应用于代理的 internal ,它负责在延迟后在计划任务上发布消息。
例如,当下游消息流抛出异常并且 的事务回滚时,可能会使用它。
在这种情况下,延迟消息将保留在持久性 .
您可以在 .
该元素定义了一个简单的通知链,该链只有事务性建议。
以下示例显示了 中的 :<delayer>
<transactional>
<advice-chain>
List
DelayHandler.ReleaseMessageHandler
Thread
ReleaseMessageHandler
MessageStore
org.aopalliance.aop.Advice
<advice-chain>
<transactional>
advice-chain
<delayer>
<int:delayer id="delayer" input-channel="input" output-channel="output"
expression="headers.delay"
message-store="jdbcMessageStore">
<int:advice-chain>
<beans:ref bean="customAdviceBean"/>
<tx:advice>
<tx:attributes>
<tx:method name="*" read-only="true"/>
</tx:attributes>
</tx:advice>
</int:advice-chain>
</int:delayer>
这可以导出为具有托管操作 ( 和 )的 JMX,这允许在运行时重新安排延迟的持久消息 — 例如,如果 之前已停止。
这些操作可以通过命令调用,如下例所示:DelayHandler
MBean
getDelayedMessageCount
reschedulePersistedMessages
TaskScheduler
Control Bus
Message<String> delayerReschedulingMessage =
MessageBuilder.withPayload("@'delayer.handler'.reschedulePersistedMessages()").build();
controlBusChannel.send(delayerReschedulingMessage);
有关消息存储、JMX 和控制总线的更多信息,请参阅系统管理。 |
从版本 5.3.7 开始,如果在将消息存储到 a 时事务处于活动状态,则会在回调中安排发布任务。
这对于防止争用情况是必要的,在这种情况下,计划的发布可能会在事务提交之前运行,并且找不到消息。
在这种情况下,消息将在延迟后或事务提交后(以较晚者为准)发布。MessageStore
TransactionSynchronization.afterCommit()
发布失败
从版本 5.0.8 开始,延迟器上有两个新属性:
-
maxAttempts
(默认 5) -
retryDelay
(默认 1 秒)
发布消息时,如果下游流失败,将在 .
如果达到 ,则丢弃消息(除非发布是事务性的,在这种情况下,消息将保留在存储中,但不会再计划发布,直到重新启动应用程序或调用该方法,如上所述)。retryDelay
maxAttempts
reschedulePersistedMessages()
此外,您还可以配置 ;当发布失败时,将 an 发送到该通道,并将 exception 作为 payload 并具有 property 。
它包含一个包含当前计数的标头。delayedMessageErrorChannel
ErrorMessage
originalMessage
ErrorMessage
IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT
如果错误流使用错误消息并正常退出,则不会执行进一步的操作;如果发布是事务性的,则将提交事务并从存储中删除消息。
如果错误流引发异常,则将重试发布,直到如上所述。maxAttempts
脚本支持
Spring 集成 2.1 增加了对 Java 版本 6 中引入的 JSR223 Scripting for Java 规范的支持。 它允许您使用以任何受支持的语言(包括 Ruby、JRuby、Groovy 和 Kotlin)编写的脚本为各种集成组件提供逻辑,类似于 Spring 集成中使用 Spring 表达式语言 (SPEL) 的方式。 有关 JSR223 的更多信息,请参阅文档。
您需要将此依赖项包含在您的项目中:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-scripting</artifactId>
<version>6.1.9</version>
</dependency>
compile "org.springframework.integration:spring-integration-scripting:6.1.9"
此外,您需要添加脚本引擎实现,例如 JRuby、Jython。
从版本 5.2 开始, Spring 集成提供了 Kotlin Jsr223 支持。 您需要将此依赖项添加到您的项目中以使其正常工作:
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-scripting-jsr223</artifactId>
<scope>runtime</scope>
</dependency>
runtime 'org.jetbrains.kotlin:kotlin-scripting-jsr223'
第三方已经开发了各种 JSR223 语言实现。 特定实现与 Spring Integration 的兼容性取决于它与规范的一致性以及实现者对规范的解释。 |
如果您计划使用 Groovy 作为脚本语言,我们建议您使用 Spring-Integration 的 Groovy 支持,因为它提供了特定于 Groovy 的其他功能。 但是,本节也相关。 |
脚本配置
根据集成要求的复杂程度,脚本可以作为 XML 配置中的 CDATA 内联提供,也可以作为对包含脚本的 Spring 资源的引用提供。
为了启用脚本支持, Spring 集成定义了一个 ,它将消息有效负载绑定到名为 的变量,并将消息头绑定到变量,两者都可以在脚本执行上下文中访问。
您需要做的就是编写一个使用这些变量的脚本。
以下一对示例显示了创建筛选条件的示例配置:ScriptExecutingMessageProcessor
payload
headers
@Bean
public IntegrationFlow scriptFilter() {
return f -> f.filter(Scripts.processor("some/path/to/ruby/script/RubyFilterTests.rb"));
}
...
@Bean
public Resource scriptResource() {
return new ByteArrayResource("headers.type == 'good'".getBytes());
}
@Bean
public IntegrationFlow scriptFilter() {
return f -> f.filter(Scripts.processor(scriptResource()).lang("groovy"));
}
<int:filter input-channel="referencedScriptInput">
<int-script:script location="some/path/to/ruby/script/RubyFilterTests.rb"/>
</int:filter>
<int:filter input-channel="inlineScriptInput">
<int-script:script lang="groovy">
<![CDATA[
return payload == 'good'
]]>
</int-script:script>
</int:filter>
如前面的示例所示,脚本可以内联包含,也可以通过引用资源位置(通过使用属性)包含在内。
此外,该属性对应于语言名称(或其 JSR223 别名)。location
lang
其他支持脚本编写的 Spring 集成端点元素包括 、 、 和 。
每种情况下的脚本配置都与上述相同(除了 endpoint 元素)。router
service-activator
transformer
splitter
脚本支持的另一个有用功能是能够更新(重新加载)脚本,而不必重新启动应用程序上下文。
为此,请在元素上指定 attribute,如下例所示:refresh-check-delay
script
Scripts.processor(...).refreshCheckDelay(5000)
}
<int-script:script location="..." refresh-check-delay="5000"/>
在前面的示例中,每 5 秒检查一次脚本位置的更新。 如果脚本已更新,则自更新后 5 秒后发生的任何调用都会导致运行新脚本。
请考虑以下示例:
Scripts.processor(...).refreshCheckDelay(0)
}
<int-script:script location="..." refresh-check-delay="0"/>
在前面的示例中,一旦发生任何脚本修改,就会使用任何脚本修改更新上下文,从而为“实时”配置提供一种简单的机制。 任何负值都表示在初始化应用程序上下文后不会重新加载脚本。 这是默认行为。 以下示例显示了一个从不更新的脚本:
Scripts.processor(...).refreshCheckDelay(-1)
}
<int-script:script location="..." refresh-check-delay="-1"/>
内联脚本无法重新加载。 |
脚本变量绑定
需要变量绑定才能使脚本能够引用外部提供给脚本执行上下文的变量。
默认情况下,用作绑定变量。
您可以使用 elements(或 option)将其他变量绑定到脚本,如下例所示:payload
headers
<variable>
ScriptSpec.variables()
Scripts.processor("foo/bar/MyScript.py")
.variables(Map.of("var1", "thing1", "var2", "thing2", "date", date))
}
<script:script lang="py" location="foo/bar/MyScript.py">
<script:variable name="var1" value="thing1"/>
<script:variable name="var2" value="thing2"/>
<script:variable name="date" ref="date"/>
</script:script>
如前面的示例所示,您可以将脚本变量绑定到标量值或 Spring Bean 引用。
请注意,和 仍作为绑定变量包含在内。payload
headers
在 Spring Integration 3.0 中,除了 element 之外,还引入了该属性。
此属性和元素并不互斥,您可以将它们合并到一个组件中。
但是,无论变量在何处定义,变量都必须是唯一的。
此外,从 Spring Integration 3.0 开始,内联脚本也允许变量绑定,如下例所示:variable
variables
variable
script
<service-activator input-channel="input">
<script:script lang="ruby" variables="thing1=THING1, date-ref=dateBean">
<script:variable name="thing2" ref="thing2Bean"/>
<script:variable name="thing3" value="thing2"/>
<![CDATA[
payload.foo = thing1
payload.date = date
payload.bar = thing2
payload.baz = thing3
payload
]]>
</script:script>
</service-activator>
前面的示例显示了内联脚本、元素和属性的组合。
该属性包含一个逗号分隔的值,其中每个段都包含变量及其值的 '=' 分隔对。
变量名称可以后缀为 ,如前面示例中的变量所示。
这意味着绑定变量的名称为 ,但该值是应用程序上下文中对 bean 的引用。
这在使用属性占位符配置或命令行参数时可能很有用。variable
variables
variables
-ref
date-ref
date
dateBean
如果您需要对变量的生成方式进行更多控制,则可以实现自己的 Java 类,该类使用策略,该策略由以下接口定义:ScriptVariableGenerator
public interface ScriptVariableGenerator {
Map<String, Object> generateScriptVariables(Message<?> message);
}
该接口需要你实现该方法。
该 message 参数允许您访问消息有效负载和标头中的任何可用数据,返回值是 of 绑定变量。
每次为消息执行脚本时,都会调用此方法。
下面的示例展示了如何提供 attribute 的实现并引用它:generateScriptVariables(Message)
Map
ScriptVariableGenerator
script-variable-generator
Scripts.processor("foo/bar/MyScript.groovy")
.variableGenerator(new foo.bar.MyScriptVariableGenerator())
}
<int-script:script location="foo/bar/MyScript.groovy"
script-variable-generator="variableGenerator"/>
<bean id="variableGenerator" class="foo.bar.MyScriptVariableGenerator"/>
如果未提供 a,则脚本组件使用 ,它将提供的任何元素与其方法中的 和 变量合并。script-variable-generator
DefaultScriptVariableGenerator
<variable>
payload
headers
Message
generateScriptVariables(Message)
您不能同时提供 attribute 和 element。
它们是互斥的。script-variable-generator <variable> |
GraalVM Polyglot
从版本 6.0 开始,该框架提供了一个基于 GraalVM Polyglot API 的 API。
JavaScript 的 JSR223 引擎实现(单独从 Java 中删除)已被使用这个新的脚本执行程序所取代。
请参阅有关在 GraalVM 中启用 JavaScript 支持以及可以通过脚本变量传播哪些配置选项的更多信息。
默认情况下,框架设置为在共享的 Polyglot 上,从而启用与主机 JVM 的这种交互:PolyglotScriptExecutor
allowAllAccess
true
Context
-
新线程的创建和使用。
-
对公共主机类的访问。
-
通过向类路径添加条目来加载新的主机类。
-
将新成员导出到多语言绑定中。
-
主机系统上不受限制的 IO 操作。
-
传递实验性选项。
-
创建和使用 New sub-process。
-
对进程环境变量的访问。
这可以通过接受 .PolyglotScriptExecutor
org.graalvm.polyglot.Context.Builder
要启用此 JavaScript 支持,必须使用安装了组件的 GraalVM,或者在使用常规 JVM 时,必须包含 and 依赖项。js
org.graalvm.sdk:graal-sdk
org.graalvm.js:js
Groovy 支持
在 Spring Integration 2.0 中,我们添加了 Groovy 支持,允许您使用 Groovy 脚本语言为各种集成组件提供逻辑,类似于 Spring 表达式语言 (SPEL) 支持路由、转换和其他集成问题的方式。 有关 Groovy 的更多信息,请参阅 Groovy 文档,您可以在项目网站上找到该文档。
您需要将此依赖项包含在您的项目中:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-groovy</artifactId>
<version>6.1.9</version>
</dependency>
compile "org.springframework.integration:spring-integration-groovy:6.1.9"
此外,从 V6.0 开始,提供了用于集成流配置的 Groovy DSL。
Groovy 配置
在 Spring Integration 2.1 中,Groovy 支持的配置名称空间是 Spring Integration 的脚本支持的扩展,并共享脚本支持部分中详细描述的核心配置和行为。
尽管通用脚本支持很好地支持 Groovy 脚本,但 Groovy 支持提供了配置名称空间,该名称空间由 Spring 框架和相关组件支持,为使用 Groovy 提供了扩展功能。
下面的清单显示了两个示例配置:Groovy
org.springframework.scripting.groovy.GroovyScriptFactory
<int:filter input-channel="referencedScriptInput">
<int-groovy:script location="some/path/to/groovy/file/GroovyFilterTests.groovy"/>
</int:filter>
<int:filter input-channel="inlineScriptInput">
<int-groovy:script><![CDATA[
return payload == 'good'
]]></int-groovy:script>
</int:filter>
如前面的示例所示,该配置看起来与常规脚本支持配置相同。
唯一的区别是使用 Groovy 命名空间,如命名空间前缀所示。
另请注意,标记上的属性在此命名空间中无效。int-groovy
lang
<script>
Groovy 对象自定义
如果需要自定义 Groovy 对象本身(除了设置变量之外),则可以引用使用该属性实现的 Bean。
例如,如果要通过修改 和 registering 函数以在脚本中可用来实现域特定语言 (DSL),这可能很有用。
以下示例显示了如何执行此操作:GroovyObjectCustomizer
customizer
MetaClass
<int:service-activator input-channel="groovyChannel">
<int-groovy:script location="somewhere/SomeScript.groovy" customizer="groovyCustomizer"/>
</int:service-activator>
<beans:bean id="groovyCustomizer" class="org.something.MyGroovyObjectCustomizer"/>
设置 custom 与 elements 或 attribute 并不互斥。
也可以在定义内联脚本时提供。GroovyObjectCustomizer
<variable>
script-variable-generator
Spring Integration 3.0 引入了该属性,该属性与该元素结合使用。
此外,如果未为绑定变量提供名称,则 groovy 脚本能够将变量解析为 中的 bean。
以下示例演示如何使用变量 ():variables
variable
BeanFactory
entityManager
<int-groovy:script>
<![CDATA[
entityManager.persist(payload)
payload
]]>
</int-groovy:script>
entityManager
必须是应用程序上下文中的适当 Bean。
有关元素、属性和属性的更多信息,请参阅脚本变量绑定。<variable>
variables
script-variable-generator
Groovy Script 编译器定制
提示是最流行的 Groovy 编译器自定义选项。
它可以在类或方法级别使用。
有关更多信息,请参阅 Groovy 参考手册,特别是 @CompileStatic。
为了将此功能用于短脚本(在集成场景中),我们被迫将简单脚本更改为更类似于 Java 的代码。
请考虑以下脚本:@CompileStatic
<filter>
headers.type == 'good'
前面的脚本在 Spring Integration 中成为以下方法:
@groovy.transform.CompileStatic
String filter(Map headers) {
headers.type == 'good'
}
filter(headers)
这样,该方法被转换并编译为静态 Java 代码,绕过了 Groovy
调用的动态阶段,例如 factories 和 proxy。filter()
getProperty()
CallSite
从版本 4.3 开始,您可以使用选项配置 Spring 集成 Groovy 组件,指定应将 for 添加到内部。
有了这个,你可以在我们的脚本代码中省略方法声明 with,仍然可以获得编译的纯 Java 代码。
在这种情况下,前面的脚本可以很短,但仍需要比解释的脚本更详细一些,如下例所示:compile-static
boolean
ASTTransformationCustomizer
@CompileStatic
CompilerConfiguration
@CompileStatic
binding.variables.headers.type == 'good'
您必须通过属性访问 and(或任何其他)变量,因为使用 ,我们没有动态功能。headers
payload
groovy.lang.Script
binding
@CompileStatic
GroovyObject.getProperty()
此外,我们还引入了 bean 引用。
使用此属性,您可以提供任何其他必需的 Groovy 编译器自定义,例如 .
有关此功能的更多信息,请参阅高级编译器配置的 Groovy 文档。compiler-configuration
ImportCustomizer
using 不会自动为 annotation 添加 an,它会覆盖该选项。
如果您仍然需要 ,则应手动将 添加到该自定义 中。compilerConfiguration ASTTransformationCustomizer @CompileStatic compileStatic CompileStatic new ASTTransformationCustomizer(CompileStatic.class) CompilationCustomizers compilerConfiguration |
Groovy 编译器自定义对选项没有任何影响,并且可重新加载的脚本也可以静态编译。refresh-check-delay |
控制总线
如 (企业集成模式) 中所述,控制总线背后的思想是,您可以使用与 “应用程序级” 消息传递相同的消息传递系统来监视和管理框架内的组件。 在 Spring 集成中,我们构建在前面描述的适配器之上,以便您可以发送消息作为调用公开操作的一种方式。 这些操作的一个选项是 Groovy 脚本。 下面的示例为 control bus 配置一个 Groovy 脚本:
<int-groovy:control-bus input-channel="operationChannel"/>
控制总线有一个 Importing 通道,可以访问该通道以调用应用程序上下文中 bean 上的操作。
Groovy 控制总线将 Importing 通道上的消息作为 Groovy 脚本运行。
它接受一条消息,将正文编译为脚本,使用 自定义它,然后运行它。
控制总线公开了应用程序上下文中的所有 bean,这些 bean 被 Spring 的接口注释并实现或扩展 Spring 的基类(例如,几个 和 实现)。GroovyObjectCustomizer
MessageProcessor
@ManagedResource
Lifecycle
CustomizableThreadCreator
TaskExecutor
TaskScheduler
在 Control Bus' 命令脚本中使用具有自定义作用域(例如 'request')的托管 bean 时要小心,尤其是在异步消息流中。
如果控制总线无法从应用程序上下文中公开 bean,则在命令脚本运行期间可能会得到一些 bean。
例如,如果未建立自定义范围的上下文,则尝试获取该范围内的 bean 会触发 .MessageProcessor BeansException BeanCreationException |
如果需要进一步自定义 Groovy 对象,还可以提供对通过该属性实现的 bean 的引用,如下例所示:GroovyObjectCustomizer
customizer
<int-groovy:control-bus input-channel="input"
output-channel="output"
customizer="groovyCustomizer"/>
<beans:bean id="groovyCustomizer" class="org.foo.MyGroovyObjectCustomizer"/>
向终端节点添加行为
在 Spring Integration 2.2 之前,你可以通过向 Poller 的元素添加 AOP Advice 来向整个 Integration 流添加行为。
但是,假设您只想重试 REST Web 服务调用,而不重试任何下游终端节点。<advice-chain/>
例如,请考虑以程:
inbound-adapter->poller->http-gateway1->http-gateway2->jdbc-outbound-adapter
如果你在 Poller 上的建议链中配置了一些重试逻辑,并且由于网络故障而对 的调用失败,则重试会导致 和 被第二次调用。
同样,在 jdbc-outbound-adapter 中出现暂时性故障后,两个 HTTP 网关都会被第二次调用,然后再次调用 .http-gateway2
http-gateway1
http-gateway2
jdbc-outbound-adapter
Spring Integration 2.2 增加了向单个端点添加行为的能力。
这是通过将元素添加到许多端点来实现的。
以下示例演示如何在 :<request-handler-advice-chain/>
<request-handler-advice-chain/>
outbound-gateway
<int-http:outbound-gateway id="withAdvice"
url-expression="'http://localhost/test1'"
request-channel="requests"
reply-channel="nextChannel">
<int-http:request-handler-advice-chain>
<ref bean="myRetryAdvice" />
</int-http:request-handler-advice-chain>
</int-http:outbound-gateway>
在这种情况下,仅本地应用于此网关,不适用于在将回复发送到 后向下游采取的进一步操作。
建议的范围仅限于终端节点本身。myRetryAdvice
nextChannel
此时,您无法建议整个终端节点。
架构不允许 a 作为链本身的子元素。 但是,可以将 a 添加到元素中生成回复的各个终结点。
一个例外是,在没有生成回复的链中,因为链中的最后一个元素是 ,所以不能通知最后一个元素。
如果你需要通知这样的元素,它必须移动到链之外(链的 是适配器的)。
然后可以像往常一样通知适配器。
对于生成 reply 的链,可以通知每个 child element。 |
提供的建议课程
除了提供应用 AOP 建议类的通用机制外, Spring 集成还提供了这些开箱即用的建议实现:
-
RequestHandlerRetryAdvice
(如重试建议中所述) -
RequestHandlerCircuitBreakerAdvice
(在 Circuit Breaker Advice 中描述) -
ExpressionEvaluatingRequestHandlerAdvice
(在 Expression Evaluation Advice 中描述) -
RateLimiterRequestHandlerAdvice
(如 Rate Limiter Advice 中所述) -
CacheRequestHandlerAdvice
(在 缓存建议 中描述) -
ReactiveRequestHandlerAdvice
(在 Reactive Advice 中描述) -
ContextHolderRequestHandlerAdvice
(在 Context Holder Advice 中描述)
重试建议
重试建议 () 利用了 Spring Retry 项目提供的丰富重试机制。
的核心组件是 ,它允许配置复杂的重试场景,包括 和 策略(具有许多实施)以及用于确定重试用尽时要采取的操作的策略。o.s.i.handler.advice.RequestHandlerRetryAdvice
spring-retry
RetryTemplate
RetryPolicy
BackoffPolicy
RecoveryCallback
- 无状态重试
-
无状态重试是指重试活动完全在通知中处理的情况。 线程将暂停(如果配置为这样做)并重试该操作。
- 状态重试
-
有状态重试是指在通知中管理重试状态,但引发异常并且调用方重新提交请求的情况。 有状态重试的一个示例是,当我们希望消息发起方(例如 JMS)负责重新提交,而不是在当前线程上执行时。 有状态重试需要某种机制来检测重试的提交。
有关 的更多信息,请参阅项目的 Javadoc 和 Spring Batch 的参考文档(其中的来源)。spring-retry
spring-retry
默认的 back off 行为是 not reoff 。 将立即尝试重试。 使用导致线程在两次尝试之间暂停的回退策略可能会导致性能问题,包括内存使用过多和线程不足。 在大容量环境中,应谨慎使用回退策略。 |
配置 Retry Advice
本节中的示例使用始终引发异常的以下内容:<service-activator>
public class FailingService {
public void service(String message) {
throw new RuntimeException("error");
}
}
- 简单无状态重试
-
默认值为 a that tries 3 次。 没有 ,因此这三次尝试是背靠背进行的,两次尝试之间没有延迟。 没有 ,因此结果是在最后一次重试失败后向调用方抛出异常。 在 Spring 集成环境中,这个最终异常可以通过在入站端点上使用来处理。 以下示例使用并显示其输出:
RetryTemplate
SimpleRetryPolicy
BackOffPolicy
RecoveryCallback
error-channel
RetryTemplate
DEBUG
<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"/> </int:request-handler-advice-chain> </int:service-activator> DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...] DEBUG [task-scheduler-2]Retry: count=0 DEBUG [task-scheduler-2]Checking for rethrow: count=1 DEBUG [task-scheduler-2]Retry: count=1 DEBUG [task-scheduler-2]Checking for rethrow: count=2 DEBUG [task-scheduler-2]Retry: count=2 DEBUG [task-scheduler-2]Checking for rethrow: count=3 DEBUG [task-scheduler-2]Retry failed last attempt: count=3
- 带恢复的简单无状态重试
-
以下示例将 a 添加到前面的示例中,并使用 an 将 发送到通道:
RecoveryCallback
ErrorMessageSendingRecoverer
ErrorMessage
<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"> <property name="recoveryCallback"> <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer"> <constructor-arg ref="myErrorChannel" /> </bean> </property> </bean> </int:request-handler-advice-chain> </int:service-activator> DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...] DEBUG [task-scheduler-2]Retry: count=0 DEBUG [task-scheduler-2]Checking for rethrow: count=1 DEBUG [task-scheduler-2]Retry: count=1 DEBUG [task-scheduler-2]Checking for rethrow: count=2 DEBUG [task-scheduler-2]Retry: count=2 DEBUG [task-scheduler-2]Checking for rethrow: count=3 DEBUG [task-scheduler-2]Retry failed last attempt: count=3 DEBUG [task-scheduler-2]Sending ErrorMessage :failedMessage:[Payload=...]
- 使用自定义策略的无状态重试和恢复
-
对于更复杂的问题,我们可以提供定制的建议。 此示例继续使用 ,但将尝试次数增加到 4 次。 它还添加了一个 first,第一次重试等待 1 秒,第二次重试等待 5 秒,第三次重试等待 25 秒(总共尝试 4 次)。 下面的清单显示了该示例及其输出:
RetryTemplate
SimpleRetryPolicy
ExponentialBackoffPolicy
DEBUG
<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"> <property name="recoveryCallback"> <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer"> <constructor-arg ref="myErrorChannel" /> </bean> </property> <property name="retryTemplate" ref="retryTemplate" /> </bean> </int:request-handler-advice-chain> </int:service-activator> <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate"> <property name="retryPolicy"> <bean class="org.springframework.retry.policy.SimpleRetryPolicy"> <property name="maxAttempts" value="4" /> </bean> </property> <property name="backOffPolicy"> <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy"> <property name="initialInterval" value="1000" /> <property name="multiplier" value="5.0" /> <property name="maxInterval" value="60000" /> </bean> </property> </bean> 27.058 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...] 27.071 DEBUG [task-scheduler-1]Retry: count=0 27.080 DEBUG [task-scheduler-1]Sleeping for 1000 28.081 DEBUG [task-scheduler-1]Checking for rethrow: count=1 28.081 DEBUG [task-scheduler-1]Retry: count=1 28.081 DEBUG [task-scheduler-1]Sleeping for 5000 33.082 DEBUG [task-scheduler-1]Checking for rethrow: count=2 33.082 DEBUG [task-scheduler-1]Retry: count=2 33.083 DEBUG [task-scheduler-1]Sleeping for 25000 58.083 DEBUG [task-scheduler-1]Checking for rethrow: count=3 58.083 DEBUG [task-scheduler-1]Retry: count=3 58.084 DEBUG [task-scheduler-1]Checking for rethrow: count=4 58.084 DEBUG [task-scheduler-1]Retry failed last attempt: count=4 58.086 DEBUG [task-scheduler-1]Sending ErrorMessage :failedMessage:[Payload=...]
- 命名空间对无状态重试的支持
-
从版本 4.0 开始,由于命名空间支持重试建议,可以大大简化前面的配置,如下例所示:
<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <ref bean="retrier" /> </int:request-handler-advice-chain> </int:service-activator> <int:handler-retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel"> <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" /> </int:handler-retry-advice>
在前面的示例中,建议被定义为顶级 Bean,以便它可以在多个实例中使用。 您还可以直接在链中定义建议,如下例所示:
request-handler-advice-chain
<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <int:retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel"> <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" /> </int:retry-advice> </int:request-handler-advice-chain> </int:service-activator>
A 可以具有 或 子元素,也可以没有子元素。 没有子元素的 A 不使用退避。 如果没有 ,则在重试用尽时引发异常。 命名空间只能与无状态重试一起使用。
<handler-retry-advice>
<fixed-back-off>
<exponential-back-off>
<handler-retry-advice>
recovery-channel
对于更复杂的环境(自定义策略等),请使用普通定义。
<bean>
- 带恢复的简单状态重试
-
要使 retry 有状态,我们需要通过 implementation 提供建议。 此类用于将邮件标识为重新提交,以便可以确定此邮件的当前重试状态。 框架提供了一个 ,它使用 SPEL 表达式确定消息标识符。 此示例再次使用默认策略 (3 次尝试,无回退)。 与无状态重试一样,这些策略可以自定义。 下面的清单显示了该示例及其输出:
RetryStateGenerator
RetryTemplate
SpelExpressionRetryStateGenerator
DEBUG
<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"> <property name="retryStateGenerator"> <bean class="o.s.i.handler.advice.SpelExpressionRetryStateGenerator"> <constructor-arg value="headers['jms_messageId']" /> </bean> </property> <property name="recoveryCallback"> <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer"> <constructor-arg ref="myErrorChannel" /> </bean> </property> </bean> </int:request-handler-advice-chain> </int:service-activator> 24.351 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 24.368 DEBUG [Container#0-1]Retry: count=0 24.387 DEBUG [Container#0-1]Checking for rethrow: count=1 24.387 DEBUG [Container#0-1]Rethrow in retry for policy: count=1 24.387 WARN [Container#0-1]failure occurred in gateway sendAndReceive org.springframework.integration.MessagingException: Failed to invoke handler ... Caused by: java.lang.RuntimeException: foo ... 24.391 DEBUG [Container#0-1]Initiating transaction rollback on application exception ... 25.412 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 25.412 DEBUG [Container#0-1]Retry: count=1 25.413 DEBUG [Container#0-1]Checking for rethrow: count=2 25.413 DEBUG [Container#0-1]Rethrow in retry for policy: count=2 25.413 WARN [Container#0-1]failure occurred in gateway sendAndReceive org.springframework.integration.MessagingException: Failed to invoke handler ... Caused by: java.lang.RuntimeException: foo ... 25.414 DEBUG [Container#0-1]Initiating transaction rollback on application exception ... 26.418 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 26.418 DEBUG [Container#0-1]Retry: count=2 26.419 DEBUG [Container#0-1]Checking for rethrow: count=3 26.419 DEBUG [Container#0-1]Rethrow in retry for policy: count=3 26.419 WARN [Container#0-1]failure occurred in gateway sendAndReceive org.springframework.integration.MessagingException: Failed to invoke handler ... Caused by: java.lang.RuntimeException: foo ... 26.420 DEBUG [Container#0-1]Initiating transaction rollback on application exception ... 27.425 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 27.426 DEBUG [Container#0-1]Retry failed last attempt: count=3 27.426 DEBUG [Container#0-1]Sending ErrorMessage :failedMessage:[Payload=...]
如果将前面的示例与无状态示例进行比较,可以看到,使用有状态重试时,每次失败时都会向调用方引发异常。
- 重试的异常分类
-
Spring Retry 在确定哪些异常可以调用重试方面具有很大的灵活性。 默认配置对所有异常重试,异常分类器查看顶级异常。 如果您将其配置为仅在 on 重试,并且您的应用程序引发 a ,其中原因是 ,则不会发生重试。
MyException
SomeOtherException
MyException
从 Spring Retry 1.0.3 开始,它有一个名为 (默认为 ) 的属性。 When 时,它会遍历异常原因,直到找到匹配项或用完遍历的原因。
BinaryExceptionClassifier
traverseCauses
false
true
要使用此分类器进行重试,请使用 created 和构造函数,该构造函数采用最大尝试次数、对象和布尔值。 然后,您可以将此策略注入到 .
SimpleRetryPolicy
Map
Exception
traverseCauses
RetryTemplate
traverseCauses 是必需的,因为用户异常可能包装在 .MessagingException |
熔断器建议
断路器模式的一般思路是,如果某个服务当前不可用,请不要浪费时间(和资源)来尝试使用它。
实现此模式。
当断路器处于 closed 状态时,终端节点会尝试调用该服务。
如果连续尝试一定次数失败,则断路器将进入 open 状态。
当它处于打开状态时,新请求“快速失败”,并且在一段时间到期之前不会尝试调用该服务。o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice
当该时间到期时,断路器将设置为半开状态。 在此状态下,即使一次尝试失败,断路器也会立即进入打开状态。 如果尝试成功,断路器将进入 closed 状态,在这种情况下,它不会再次进入 open 状态,直到再次发生配置的连续失败次数。 任何成功的尝试都会将状态重置为零失败,以确定断路器何时可能再次进入打开状态。
通常,此建议可能用于外部服务,在这些服务中,可能需要一些时间才能失败(例如尝试建立网络连接时超时)。
具有两个属性:和 。
该属性表示在断路器打开之前需要发生的连续失败次数。
它默认为 .
该属性表示 breaker 在上次失败后尝试另一个请求之前等待的时间。
默认值为 1000 毫秒。RequestHandlerCircuitBreakerAdvice
threshold
halfOpenAfter
threshold
5
halfOpenAfter
以下示例配置断路器并显示其 and 输出:DEBUG
ERROR
<int:service-activator input-channel="input" ref="failer" method="service">
<int:request-handler-advice-chain>
<bean class="o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice">
<property name="threshold" value="2" />
<property name="halfOpenAfter" value="12000" />
</bean>
</int:request-handler-advice-chain>
</int:service-activator>
05.617 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...]
05.638 ERROR [task-scheduler-1]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
10.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
10.600 ERROR [task-scheduler-2]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
15.598 DEBUG [task-scheduler-3]preSend on channel 'input', message: [Payload=...]
15.599 ERROR [task-scheduler-3]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
20.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
20.598 ERROR [task-scheduler-2]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
25.598 DEBUG [task-scheduler-5]preSend on channel 'input', message: [Payload=...]
25.601 ERROR [task-scheduler-5]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
30.598 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=foo...]
30.599 ERROR [task-scheduler-1]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
在前面的示例中,阈值设置为秒。
每 5 秒收到一个新请求。
前两次尝试调用了该服务。
第三个和第四个失败,并出现异常,指示断路器已打开。
尝试了第五个请求,因为该请求是在上次失败后的 15 秒。
第六次尝试立即失败,因为断路器立即打开。2
halfOpenAfter
12
表达式计算建议
最后提供的建议类是 .
这个建议比其他两个建议更普遍。
它提供了一种机制,用于计算发送到终端节点的原始入站消息的表达式。
在成功或失败后,可以评估单独的表达式。
(可选)可以将包含评估结果的消息与输入消息一起发送到消息通道。o.s.i.handler.advice.ExpressionEvaluatingRequestHandlerAdvice
此建议的一个典型用例可能是使用 ,如果传输成功,则将文件移动到一个目录,如果传输失败,则移动到另一个目录:<ftp:outbound-channel-adapter/>
该通知具有用于设置成功时表达式、失败时设置表达式以及每个选项的相应通道的属性。
对于成功案例,发送到的消息是 ,其中有效负载是表达式评估的结果。
名为 的附加属性包含发送到处理程序的原始消息。
发送到 的消息(当处理程序引发异常时)是负载为 的 。
与所有实例一样,此有效负载具有 和 properties,以及一个名为 的附加属性,其中包含表达式计算的结果。successChannel
AdviceMessage
inputMessage
failureChannel
ErrorMessage
MessageHandlingExpressionEvaluatingAdviceException
MessagingException
failedMessage
cause
evaluationResult
从版本 5.1.3 开始,如果配置了 channels,但未提供表达式,则默认表达式将用于评估消息的 the 。payload |
当在建议范围内引发异常时,默认情况下,该异常将在评估 any 后引发给调用方。
如果要禁止引发异常,请将该属性设置为 .
以下建议显示了如何使用 Java DSL 配置 一个:failureExpression
trapException
true
advice
@SpringBootApplication
public class EerhaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(EerhaApplication.class, args);
MessageChannel in = context.getBean("advised.input", MessageChannel.class);
in.send(new GenericMessage<>("good"));
in.send(new GenericMessage<>("bad"));
context.close();
}
@Bean
public IntegrationFlow advised() {
return f -> f.<String>handle((payload, headers) -> {
if (payload.equals("good")) {
return null;
}
else {
throw new RuntimeException("some failure");
}
}, c -> c.advice(expressionAdvice()));
}
@Bean
public Advice expressionAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setSuccessChannelName("success.input");
advice.setOnSuccessExpressionString("payload + ' was successful'");
advice.setFailureChannelName("failure.input");
advice.setOnFailureExpressionString(
"payload + ' was bad, with reason: ' + #exception.cause.message");
advice.setTrapException(true);
return advice;
}
@Bean
public IntegrationFlow success() {
return f -> f.handle(System.out::println);
}
@Bean
public IntegrationFlow failure() {
return f -> f.handle(System.out::println);
}
}
Rate Limiter 建议
Rate Limiter 建议 () 允许确保终端节点不会因请求而过载。
当超出速率限制时,请求将进入 blocked 状态。RateLimiterRequestHandlerAdvice
此建议的一个典型使用案例可能是外部服务提供商不允许每分钟超过请求的请求数。n
该实现完全基于 Resilience4j 项目,需要 OR 注射。
也可以使用默认值和/或自定义名称进行配置。RateLimiterRequestHandlerAdvice
RateLimiter
RateLimiterConfig
以下示例将速率限制器建议配置为每 1 秒 1 个请求:
@Bean
public RateLimiterRequestHandlerAdvice rateLimiterRequestHandlerAdvice() {
return new RateLimiterRequestHandlerAdvice(RateLimiterConfig.custom()
.limitRefreshPeriod(Duration.ofSeconds(1))
.limitForPeriod(1)
.build());
}
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "resultChannel",
adviceChain = "rateLimiterRequestHandlerAdvice")
public String handleRequest(String payload) {
...
}
缓存建议
从版本 5.2 开始,引入了 。
它基于 Spring Framework 中的缓存抽象,并与 Comments 系列提供的概念和功能保持一致。
内部的逻辑基于扩展,其中缓存操作的代理是围绕方法完成的,并将请求作为参数。
可以使用 SPEL 表达式或 a 配置此建议以评估缓存键。
该请求可用作 SPEL 评估上下文的根对象,或作为 input 参数。
默认情况下,请求消息的 用于缓存键。
当默认缓存操作为 时,必须使用 配置 ,或者使用任意 s 的集合进行配置。
每个选项都可以单独配置,也可以具有共享选项,如 、 和 ,可以从配置中重复使用。
此配置功能类似于 Spring Framework 和 Comments 的组合。
如果未提供 a,则默认情况下将从 中的 解析单个 bean。CacheRequestHandlerAdvice
@Caching
CacheAspectSupport
AbstractReplyProducingMessageHandler.RequestHandler.handleRequestMessage
Message<?>
Function
Message<?>
Function
payload
CacheRequestHandlerAdvice
cacheNames
CacheableOperation
CacheOperation
CacheOperation
CacheManager
CacheResolver
CacheErrorHandler
CacheRequestHandlerAdvice
@CacheConfig
@Caching
CacheManager
BeanFactory
CacheAspectSupport
以下示例使用一组不同的缓存操作配置两个建议:
@Bean
public CacheRequestHandlerAdvice cacheAdvice() {
CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice(TEST_CACHE);
cacheRequestHandlerAdvice.setKeyExpressionString("payload");
return cacheRequestHandlerAdvice;
}
@Transformer(inputChannel = "transformerChannel", outputChannel = "nullChannel", adviceChain = "cacheAdvice")
public Object transform(Message<?> message) {
...
}
@Bean
public CacheRequestHandlerAdvice cachePutAndEvictAdvice() {
CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice();
cacheRequestHandlerAdvice.setKeyExpressionString("payload");
CachePutOperation.Builder cachePutBuilder = new CachePutOperation.Builder();
cachePutBuilder.setCacheName(TEST_PUT_CACHE);
CacheEvictOperation.Builder cacheEvictBuilder = new CacheEvictOperation.Builder();
cacheEvictBuilder.setCacheName(TEST_CACHE);
cacheRequestHandlerAdvice.setCacheOperations(cachePutBuilder.build(), cacheEvictBuilder.build());
return cacheRequestHandlerAdvice;
}
@ServiceActivator(inputChannel = "serviceChannel", outputChannel = "nullChannel",
adviceChain = "cachePutAndEvictAdvice")
public Message<?> service(Message<?> message) {
...
}
Reactive Advice
从版本 5.3 开始,a 可用于生成回复的请求消息处理程序。
必须为此建议提供 A,并在 intercepted 方法实现生成的回复上从运算符调用它。
通常,当我们想要通过 和类似的支持运算符控制网络波动时,这种自定义是必要的。
例如,当我们可以通过 WebFlux 客户端发出 HTTP 请求时,我们可以使用以下配置来等待响应的时间不超过 5 秒:ReactiveRequestHandlerAdvice
Mono
BiFunction<Message<?>, Mono<?>, Publisher<?>>
Mono.transform()
handleRequestMessage()
Mono
timeout()
retry()
.handle(WebFlux.outboundGateway("https://somehost/"),
e -> e.customizeMonoReply((message, mono) -> mono.timeout(Duration.ofSeconds(5))));
该参数是消息处理程序的请求消息,可用于确定请求范围属性。
该参数是此消息处理程序的方法实现的结果。
也可以从此函数中调用 nested 来应用,例如,Reactive Circuit Breaker。message
mono
handleRequestMessage()
Mono.transform()
上下文持有者建议
从版本 6.1 开始,引入了 。
此建议从请求消息中获取一些值,并将其存储在上下文持有者中。
当在 target 上完成执行时,该值从上下文中是明确的。
考虑这个建议的最佳方式类似于编程流程,我们将一些值存储到 a 中,从 target 调用中访问它,然后在执行后清理它。
这需要这些构造函数参数:a 作为值提供者,作为上下文设置回调和作为上下文清理钩子。ContextHolderRequestHandlerAdvice
MessageHandler
ThreadLocal
ThreadLocal
ContextHolderRequestHandlerAdvice
Function<Message<?>, Object>
Consumer<Object>
Runnable
以下是如何将 a 与 结合使用的示例:ContextHolderRequestHandlerAdvice
o.s.i.file.remote.session.DelegatingSessionFactory
@Bean
DelegatingSessionFactory<?> dsf(SessionFactory<?> one, SessionFactory<?> two) {
return new DelegatingSessionFactory<>(Map.of("one", one, "two", two), null);
}
@Bean
ContextHolderRequestHandlerAdvice contextHolderRequestHandlerAdvice(DelegatingSessionFactory<String> dsf) {
return new ContextHolderRequestHandlerAdvice(message -> message.getHeaders().get("FACTORY_KEY"),
dsf::setThreadKey, dsf::clearThreadKey);
}
@ServiceActivator(inputChannel = "in", adviceChain = "contextHolderRequestHandlerAdvice")
FtpOutboundGateway ftpOutboundGateway(DelegatingSessionFactory<?> sessionFactory) {
return new FtpOutboundGateway(sessionFactory, "ls", "payload");
}
只需向通道发送消息,并将标头设置为 或 就足够了。
将该标头中的值设置为通过其 .
然后,当执行命令时,根据其 中的值从 中选择适当的委托。
当结果从 生成时,将根据 中的调用清除 中的值。
有关更多信息,请参阅委派 Session Factory。in
FACTORY_KEY
one
two
ContextHolderRequestHandlerAdvice
DelegatingSessionFactory
setThreadKey
FtpOutboundGateway
ls
SessionFactory
DelegatingSessionFactory
ThreadLocal
FtpOutboundGateway
ThreadLocal
DelegatingSessionFactory
clearThreadKey()
ContextHolderRequestHandlerAdvice
自定义建议类
除了前面描述的提供的 advice 类之外,您还可以实现自己的 advice 类。
虽然您可以提供 (通常) 的任何实现,但我们通常建议您将 子类 .
这样做的好处是避免编写低级面向方面的编程代码,并提供专门为此环境定制的起点。org.aopalliance.aop.Advice
org.aopalliance.intercept.MethodInterceptor
o.s.i.handler.advice.AbstractRequestHandlerAdvice
子类需要实现该方法,其定义如下:doInvoke()
/**
* Subclasses implement this method to apply behavior to the {@link MessageHandler} callback.execute()
* invokes the handler method and returns its result, or null).
* @param callback Subclasses invoke the execute() method on this interface to invoke the handler method.
* @param target The target handler.
* @param message The message that will be sent to the handler.
* @return the result after invoking the {@link MessageHandler}.
* @throws Exception
*/
protected abstract Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) throws Exception;
callback 参数可以方便地避免直接处理 AOP 的子类。
调用该方法将调用消息处理程序。callback.execute()
该参数是为那些需要维护特定处理程序的状态的子类提供的,可能是通过将该状态维护在目标的键中。
此功能允许将相同的建议应用于多个处理程序。
uses 建议 this 以保持每个处理程序的断路器状态。target
Map
RequestHandlerCircuitBreakerAdvice
参数是发送到处理程序的消息。
虽然 advice 无法在调用处理程序之前修改消息,但它可以修改有效负载(如果它具有可变属性)。
通常,通知会使用该消息进行日志记录,或者在调用处理程序之前或之后的某个位置发送消息的副本。message
返回值通常是 返回的值。
但是,该建议确实能够修改返回值。
请注意,只有实例返回值。
以下示例显示了一个扩展的自定义 advice 类:callback.execute()
AbstractReplyProducingMessageHandler
AbstractRequestHandlerAdvice
public class MyAdvice extends AbstractRequestHandlerAdvice {
@Override
protected Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) throws Exception {
// add code before the invocation
Object result = callback.execute();
// add code after the invocation
return result;
}
}
除了方法之外, 还提供了一个附加方法: .
在单次执行中可能多次调用调用的情况下,必须使用此方法,例如在 .
这是必需的,因为 Spring AOP 对象通过跟踪链中的哪个通知最后被调用来维护状态。
必须为每个调用重置此状态。 有关更多信息,请参阅 ReflectiveMethodInvocation Javadoc。 |
处理消息通知
如本节简介中所述,请求处理程序 advice 链中的 advice 对象仅应用于当前端点,而不是下游流(如果有)。
对于生成 reply 的对象(例如那些 extends ),建议应用于内部方法: (called from )。
对于其他消息处理程序,建议将应用于 .MessageHandler
AbstractReplyProducingMessageHandler
handleRequestMessage()
MessageHandler.handleMessage()
MessageHandler.handleMessage()
在某些情况下,即使消息处理程序是 ,也必须将建议应用于该方法。
例如,幂等接收器可能会返回 ,如果处理程序的属性设置为 ,则会导致异常。
另一个示例是 — 请参阅 严格消息排序。AbstractReplyProducingMessageHandler
handleMessage
null
replyRequired
true
BoundRabbitChannelAdvice
从版本 4.3.1 开始,引入了新的接口及其基本实现 () 。 实现的对象始终应用于方法,而不管处理程序类型如何。HandleMessageAdvice
AbstractHandleMessageAdvice
Advice
HandleMessageAdvice
handleMessage()
请务必了解,当应用于返回响应的处理程序时,实现(例如幂等接收器)将与 分离并正确应用于方法。HandleMessageAdvice
adviceChain
MessageHandler.handleMessage()
由于这种取消关联,因此不遵守建议链顺序。 |
请考虑以下配置:
<some-reply-producing-endpoint ... >
<int:request-handler-advice-chain>
<tx:advice ... />
<ref bean="myHandleMessageAdvice" />
</int:request-handler-advice-chain>
</some-reply-producing-endpoint>
在前面的示例中,the 应用于 .
但是,已申请 到 。
因此,它在 .
要保持顺序,您应该遵循标准的 Spring AOP 配置方法,并使用端点和后缀来获取目标 bean。
请注意,在这种情况下,整个下游流都在事务范围内。<tx:advice>
AbstractReplyProducingMessageHandler.handleRequestMessage()
myHandleMessageAdvice
MessageHandler.handleMessage()
<tx:advice>
id
.handler
MessageHandler
如果 a 不返回响应,则保留建议链顺序。MessageHandler
从版本 5.3 开始,提供了 for for the method,从而应用于整个 sub-flow。
例如,a 可以应用于从某个端点开始的整个子流;默认情况下,这是不可能的,因为使用方终端节点仅将建议应用于 .HandleMessageAdviceAdapter
MethodInterceptor
MessageHandler.handleMessage()
RetryOperationsInterceptor
AbstractReplyProducingMessageHandler.RequestHandler.handleRequestMessage()
事务支持
从版本 5.0 开始,由于实现,引入了一个新的功能,使整个下游流具有事务性。
当在元素中使用 regular 时(例如,通过 配置 ),启动的事务仅应用于内部,而不会传播到下游流。TransactionHandleMessageAdvice
HandleMessageAdvice
TransactionInterceptor
<request-handler-advice-chain>
<tx:advice>
AbstractReplyProducingMessageHandler.handleRequestMessage()
为了简化 XML 配置,除了 之外,还向 all and and 相关组件添加了一个元素。
以下示例显示了使用情况:<request-handler-advice-chain>
<transactional>
<outbound-gateway>
<service-activator>
<transactional>
<int-jdbc:outbound-gateway query="select * from things where id=:headers[id]">
<int-jdbc:transactional/>
</int-jdbc:outbound-gateway>
<bean id="transactionManager" class="org.mockito.Mockito" factory-method="mock">
<constructor-arg value="org.springframework.transaction.TransactionManager"/>
</bean>
可以使用 ,可以使用 Java 配置,并且可以在 messaging annotations 属性中使用结果 Bean 名称,如下例所示:TransactionInterceptorBuilder
adviceChain
@Bean
public ConcurrentMetadataStore store() {
return new SimpleMetadataStore(hazelcastInstance()
.getMap("idempotentReceiverMetadataStore"));
}
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(
new MetadataStoreSelector(
message -> message.getPayload().toString(),
message -> message.getPayload().toString().toUpperCase(), store()));
}
@Bean
public TransactionInterceptor transactionInterceptor() {
return new TransactionInterceptorBuilder(true)
.transactionManager(this.transactionManager)
.isolation(Isolation.READ_COMMITTED)
.propagation(Propagation.REQUIRES_NEW)
.build();
}
@Bean
@org.springframework.integration.annotation.Transformer(inputChannel = "input",
outputChannel = "output",
adviceChain = { "idempotentReceiverInterceptor",
"transactionInterceptor" })
public Transformer transformer() {
return message -> message;
}
请注意构造函数上的参数。
它会导致创建 ,而不是常规的 。true
TransactionInterceptorBuilder
TransactionHandleMessageAdvice
TransactionInterceptor
Java DSL 支持端点配置上的直通选项,如下例所示:Advice
.transactional()
@Bean
public IntegrationFlow updatingGatewayFlow() {
return f -> f
.handle(Jpa.updatingGateway(this.entityManagerFactory),
e -> e.transactional(true))
.channel(c -> c.queue("persistResults"));
}
Advising Filters
在提供建议时,还有一个额外的考虑因素。
默认情况下,任何 discard 操作(当 filter 返回时)都在 advice 链的范围内执行。
这可能包括 discard 通道下游的所有流。
因此,例如,如果 discard 通道下游的 element 抛出异常并且存在重试建议,则会重试该过程。
此外,if 设置为 (异常在通知范围内引发)。Filter
false
throwExceptionOnRejection
true
设置为 修改此行为,discard (或 exception) 发生在调用通知链之后。discard-within-advice
false
使用注释为终端节点提供建议
使用注释(、、、和)配置某些端点时,您可以在属性中为通知链提供 bean 名称。
此外,注释还具有属性,该属性可用于配置丢弃行为,如 Advising Filters中所述。
以下示例导致在通知之后执行 discard:@Filter
@ServiceActivator
@Splitter
@Transformer
adviceChain
@Filter
discardWithinAdvice
@MessageEndpoint
public class MyAdvisedFilter {
@Filter(inputChannel="input", outputChannel="output",
adviceChain="adviceChain", discardWithinAdvice="false")
public boolean filter(String s) {
return s.contains("good");
}
}
Advice Chain 中的 Ordering Advice
Advice 类是 “around” advice,以嵌套方式应用。 第一个建议是最外层的,而最后一个建议是最内层的(即最接近被建议的处理程序)。 将 advice 类按正确的顺序排列以实现所需的功能非常重要。
例如,假设您要添加重试建议和事务建议。
您可能希望先放置重试建议,然后再放置事务建议。
因此,每次重试都在新事务中执行。
另一方面,如果您希望所有尝试和任何恢复操作(在 retry 中)都限定在事务内,则可以将事务建议放在第一位。RecoveryCallback
建议的处理程序属性
有时,从 advice 中访问 handler 属性很有用。
例如,大多数处理程序都实现了允许您访问组件名称。NamedComponent
可以通过参数 (当 subclassing 时 )或 (当实现时)访问目标对象。target
AbstractRequestHandlerAdvice
invocation.getThis()
org.aopalliance.intercept.MethodInterceptor
当建议整个处理程序时(例如,当处理程序不生成回复或建议实现时),您可以将目标对象强制转换为接口,例如,如以下示例所示:HandleMessageAdvice
NamedComponent
String componentName = ((NamedComponent) target).getComponentName();
直接实现时,可以按如下方式强制转换目标对象:MethodInterceptor
String componentName = ((NamedComponent) invocation.getThis()).getComponentName();
当仅建议方法时(在生成回复的处理程序中),您需要访问完整的处理程序,即 .
以下示例显示了如何执行此操作:handleRequestMessage()
AbstractReplyProducingMessageHandler
AbstractReplyProducingMessageHandler handler =
((AbstractReplyProducingMessageHandler.RequestHandler) target).getAdvisedHandler();
String componentName = handler.getComponentName();
幂等接收器企业集成模式
从版本 4.1 开始, Spring 集成提供了幂等接收器企业集成模式的实现。
它是一个函数模式,整个幂等逻辑应该在应用程序中实现。
但是,为了简化决策,提供了该组件。
这是应用于方法的 AOP,可以根据其配置请求消息或将其标记为 。IdempotentReceiverInterceptor
Advice
MessageHandler.handleMessage()
filter
duplicate
以前,您可以通过在 (请参阅 Filter) 中使用 custom 来实现此模式。
但是,由于此模式实际上定义了终端节点的行为,而不是终端节点本身,因此幂等接收器实现不提供终端节点组件。
相反,它应用于应用程序中声明的端点。MessageSelector
<filter/>
的逻辑基于提供的,如果该选择器不接受消息,则使用设置为 的标头来扩充消息。
目标(或下游流)可以查阅此标头以实现正确的幂等逻辑。
如果配置了 或 ,则不会将重复消息发送到目标 。
相反,它被丢弃了。
如果要丢弃(不处理)重复的消息,则应使用 ,例如默认 bean。IdempotentReceiverInterceptor
MessageSelector
duplicateMessage
true
MessageHandler
IdempotentReceiverInterceptor
discardChannel
throwExceptionOnRejection = true
MessageHandler.handleMessage()
discardChannel
NullChannel
nullChannel
为了维护消息之间的状态并提供比较消息以实现幂等性的功能,我们提供了 .
它接受一个实现 (根据 创建查找键) 和一个可选的 (Metadata Store)。
有关更多信息,请参阅 MetadataStoreSelector
Javadoc。
您还可以使用额外的 .
默认情况下,使用 message 报头。MetadataStoreSelector
MessageProcessor
Message
ConcurrentMetadataStore
value
ConcurrentMetadataStore
MessageProcessor
MetadataStoreSelector
timestamp
通常,如果键没有现有值,选择器会选择一条消息进行接受。
在某些情况下,比较键的当前值和新值以确定是否应接受该消息非常有用。
从版本 5.3 开始,提供了引用 ;第一个参数是旧值;return 接受消息,并将旧值替换为 .
这对于减少键的数量很有用;例如,在处理文件中的行时,可以将文件名存储在 Key 中,将当前行号存储在 Value 中。
然后,在重新启动后,您可以跳过已处理的行。
有关示例,请参阅Idempotent Downstream Processing a Split File。compareValues
BiPredicate<String, String>
true
MetadataStore
为方便起见,可以直接在组件上配置这些选项。
下面的清单显示了所有可能的属性:MetadataStoreSelector
<idempotent-receiver>
<idempotent-receiver
id="" (1)
endpoint="" (2)
selector="" (3)
discard-channel="" (4)
metadata-store="" (5)
key-strategy="" (6)
key-expression="" (7)
value-strategy="" (8)
value-expression="" (9)
compare-values="" (10)
throw-exception-on-rejection="" /> (11)
1 | Bean 的 ID。
自选。IdempotentReceiverInterceptor |
2 | 应用此侦听器的使用者终结点名称或模式。
用逗号 () 分隔名称(模式),例如 .
然后,使用与这些模式匹配的端点 Bean 名称来检索目标端点的 Bean(使用其后缀),并将 应用于这些 Bean。
必填。, endpoint="aaa, bbb*, ccc, *ddd, eee*fff" MessageHandler .handler IdempotentReceiverInterceptor |
3 | 一个 Bean 引用。
与 和 互斥。
如果未提供,则 或 之一是必需的。MessageSelector metadata-store key-strategy (key-expression) selector key-strategy key-strategy-expression |
4 | 标识 不接受消息时要向其发送消息的通道。
省略时,重复的消息将转发到带有标头的处理程序。
自选。IdempotentReceiverInterceptor duplicateMessage |
5 | 一个参考。
由底层 .
与 互斥。
自选。
默认使用不在应用程序执行之间保持状态的 internal。ConcurrentMetadataStore MetadataStoreSelector selector MetadataStoreSelector SimpleMetadataStore |
6 | 一个参考。
由底层 .
评估 from the request message.
与 和 互斥。
如果未提供 a,则 one of 或 是必需的。MessageProcessor MetadataStoreSelector idempotentKey selector key-expression selector key-strategy key-strategy-expression |
7 | 用于填充 .
由底层 .
使用请求消息作为评估上下文根对象来评估 an。
与 和 互斥。
如果未提供 a,则 one of 或 是必需的。ExpressionEvaluatingMessageProcessor MetadataStoreSelector idempotentKey selector key-strategy selector key-strategy key-strategy-expression |
8 | 一个参考。
由底层 .
评估 from the request 消息。
与 和 互斥。
默认情况下,'MetadataStoreSelector' 使用 'timestamp' 消息标头作为元数据 'value'。MessageProcessor MetadataStoreSelector value idempotentKey selector value-expression |
9 | 用于填充 .
由底层 .
通过使用请求消息作为评估上下文根对象来评估 。
与 和 互斥。
默认情况下,'MetadataStoreSelector' 使用 'timestamp' 消息标头作为元数据 'value'。ExpressionEvaluatingMessageProcessor MetadataStoreSelector value idempotentKey selector value-strategy |
10 | 对 bean 的引用,允许您通过比较键的旧值和新值来选择性地选择消息; 默认情况下。BiPredicate<String, String> null |
11 | 如果 拒绝消息,是否引发异常。
默认为 。
无论是否提供 a,都会应用它。IdempotentReceiverInterceptor false discard-channel |
对于 Java 配置, Spring 集成提供了方法级 Comments。
它用于标记具有消息注释 (,对象将应用于此终端节点。
以下示例演示如何使用注释:@IdempotentReceiver
method
@ServiceActivator
@Router, and others) to specify which `IdempotentReceiverInterceptor
@IdempotentReceiver
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(new MetadataStoreSelector(m ->
m.getHeaders().get(INVOICE_NBR_HEADER)));
}
@Bean
@ServiceActivator(inputChannel = "input", outputChannel = "output")
@IdempotentReceiver("idempotentReceiverInterceptor")
public MessageHandler myService() {
....
}
当你使用 Java DSL 时,你可以将拦截器添加到端点的建议链中,如下例所示:
@Bean
public IntegrationFlow flow() {
...
.handle("someBean", "someMethod",
e -> e.advice(idempotentReceiverInterceptor()))
...
}
这仅适用于该方法。
从版本 4.3.1 开始,它实现了 ,并将 作为基类,以便更好地分离。
有关更多信息,请参阅 Handling Message Advice。IdempotentReceiverInterceptor MessageHandler.handleMessage(Message<?>) HandleMessageAdvice AbstractHandleMessageAdvice |
记录通道适配器
这通常与分线器结合使用,如 Wire Tap 中所述。
但是,它也可以用作任何流的最终使用者。
例如,假设有一个流以 a 结尾,该流返回一个结果,但您希望丢弃该结果。
为此,您可以将结果发送到 .
或者,也可以将其路由到级别 .
这样,当您在级别记录时可以看到丢弃的消息,但在 (例如) 级别记录时看不到它。
使用 ,在级别记录时,您只会看到丢弃的消息。
下面的清单显示了该元素的所有可能属性:<logging-channel-adapter>
<service-activator>
NullChannel
INFO
<logging-channel-adapter>
INFO
WARN
NullChannel
DEBUG
logging-channel-adapter
<int:logging-channel-adapter
channel="" (1)
level="INFO" (2)
expression="" (3)
log-full-message="false" (4)
logger-name="" /> (5)
1 | 将日志记录适配器连接到上游组件的通道。 |
2 | 将记录发送到此适配器的消息的日志记录级别。
违约:。INFO |
3 | 一个 SPEL 表达式,准确表示记录了消息的哪些部分。
Default: — 仅记录有效负载。
如果指定,则无法指定此属性。payload log-full-message |
4 | 当 时,将记录整个消息(包括报头)。
Default: — 仅记录有效负载。
如果指定了此属性,则无法指定此属性。true false expression |
5 | 指定记录器的 (称为 in )。
用于标识此适配器创建的日志消息。
这将允许为各个适配器设置日志名称(在 logging 子系统中)。
默认情况下,所有适配器都使用以下名称记录:。name category log4j org.springframework.integration.handler.LoggingHandler |
使用 Java 配置
以下 Spring Boot 应用程序显示了使用 Java 配置进行配置的示例:LoggingHandler
@SpringBootApplication
public class LoggingJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(LoggingJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
gateway.sendToLogger("foo");
}
@Bean
@ServiceActivator(inputChannel = "logChannel")
public LoggingHandler logging() {
LoggingHandler adapter = new LoggingHandler(LoggingHandler.Level.DEBUG);
adapter.setLoggerName("TEST_LOGGER");
adapter.setLogExpressionString("headers.id + ': ' + payload");
return adapter;
}
@MessagingGateway(defaultRequestChannel = "logChannel")
public interface MyGateway {
void sendToLogger(String data);
}
}
使用 Java DSL 进行配置
Spring 下面的 Boot 应用程序显示了使用 Java DSL 配置日志记录通道适配器的示例:
@SpringBootApplication
public class LoggingJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(LoggingJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
gateway.sendToLogger("foo");
}
@Bean
public IntegrationFlow loggingFlow() {
return IntegrationFlow.from(MyGateway.class)
.log(LoggingHandler.Level.DEBUG, "TEST_LOGGER",
m -> m.getHeaders().getId() + ": " + m.getPayload());
}
@MessagingGateway
public interface MyGateway {
void sendToLogger(String data);
}
}
java.util.function
接口支持
从版本 5.1 开始, Spring 集成为包中的接口提供直接支持。
所有消息传递端点(Service Activator、Transformer、Filter 等)现在都可以引用(或)bean。
与常规定义类似,Messaging Annotations 可以直接应用于这些 bean。
例如,如果你有这个 bean 定义:java.util.function
Function
Consumer
MessageHandler
Function
@Configuration
public class FunctionConfiguration {
@Bean
public Function<String, String> functionAsService() {
return String::toUpperCase;
}
}
您可以将其用作 XML 配置文件中的简单引用:
<service-activator input-channel="processorViaFunctionChannel" ref="functionAsService"/>
当我们使用 Messaging Annotations 配置流时,代码很简单:
@Bean
@Transformer(inputChannel = "functionServiceChannel")
public Function<String, String> functionAsService() {
return String::toUpperCase;
}
当函数返回数组(本质上是 any )或 Reactor 时,可以在这样的 bean 上使用它来对结果内容执行迭代。Collection
Iterable
Stream
Flux
@Splitter
该接口可用于 OR,与 Comments 一起执行流的最后一步:java.util.function.Consumer
<int:outbound-channel-adapter>
@ServiceActivator
@Bean
@ServiceActivator(inputChannel = "messageConsumerServiceChannel")
public Consumer<Message<?>> messageConsumerAsService() {
// Has to be an anonymous class for proper type inference
return new Consumer<Message<?>>() {
@Override
public void accept(Message<?> e) {
collector().add(e);
}
};
}
另外,请注意上面代码片段中的注释:如果您想处理 / 中的整个消息,则不能使用 lambda 定义。
由于 Java 类型擦除,我们无法确定方法调用的目标类型。Function
Consumer
apply()/accept()
该接口可以简单地与 annotation 一起使用,也可以作为 :java.util.function.Supplier
@InboundChannelAdapter
ref
<int:inbound-channel-adapter>
@Bean
@InboundChannelAdapter(value = "inputChannel", poller = @Poller(fixedDelay = "1000"))
public Supplier<String> pojoSupplier() {
return () -> "foo";
}
使用 Java DSL,我们只需要在端点定义中使用对函数 bean 的引用。
同时,接口的实现可以用作常规定义:Supplier
MessageSource
@Bean
public Function<String, String> toUpperCaseFunction() {
return String::toUpperCase;
}
@Bean
public Supplier<String> stringSupplier() {
return () -> "foo";
}
@Bean
public IntegrationFlow supplierFlow() {
return IntegrationFlow.from(stringSupplier())
.transform(toUpperCaseFunction())
.channel("suppliedChannel")
.get();
}
此函数支持在与 Spring Cloud Function 框架一起使用时非常有用,其中我们有一个函数目录,并且可以从集成流定义中引用其成员函数。
Kotlin 支持
该框架也得到了改进,以支持函数的 Kotlin lambda,因此现在您可以结合使用 Kotlin 语言和 Spring 集成流定义:
@Bean
@Transformer(inputChannel = "functionServiceChannel")
fun kotlinFunction(): (String) -> String {
return { it.toUpperCase() }
}
@Bean
@ServiceActivator(inputChannel = "messageConsumerServiceChannel")
fun kotlinConsumer(): (Message<Any>) -> Unit {
return { print(it) }
}
@Bean
@InboundChannelAdapter(value = "counterChannel",
poller = Poller(fixedRate = "10", maxMessagesPerPoll = "1"))
fun kotlinSupplier(): () -> String {
return { "baz" }
}
Kotlin 协程
从版本 6.0 开始, Spring 集成提供了对 Kotlin 协程的支持。
现在函数和&返回类型可以用于服务方法:suspend
kotlinx.coroutines.Deferred
kotlinx.coroutines.flow.Flow
@ServiceActivator(inputChannel = "suspendServiceChannel", outputChannel = "resultChannel")
suspend fun suspendServiceFunction(payload: String) = payload.uppercase()
@ServiceActivator(inputChannel = "flowServiceChannel", outputChannel = "resultChannel", async = "true")
fun flowServiceFunction(payload: String) =
flow {
for (i in 1..3) {
emit("$payload #$i")
}
}
该框架将它们视为 Reactive Streams 交互,并用于转换为相应的 Reactor 类型。
这样的函数 reply 将在 reply 通道中处理,如果它是 ,或者作为相应回调的结果。ReactiveAdapterRegistry
Mono
Flux
ReactiveStreamsSubscribableChannel
CompletableFuture
默认情况下,具有 result 的函数不在 上,因此实例将作为回复消息有效负载生成。
目标应用程序负责将此对象作为协程处理或分别将其转换为 。Flow async @ServiceActivator Flow Flux |
在 Kotlin 中声明接口方法时,也可以使用修饰符进行标记。
框架利用 internally 使用下游流执行请求-回复。
这样的结果由 API 在内部处理,以实现网关的调用函数的参数:@MessagingGateway
suspend
Mono
Mono
MonoKt.awaitSingleOrNull()
kotlin.coroutines.Continuation
suspend
@MessagingGateway(defaultRequestChannel = "suspendRequestChannel")
interface SuspendFunGateway {
suspend fun suspendGateway(payload: String): String
}
根据 Kotlin 语言要求,必须将此方法作为协程调用:
@Autowired
private lateinit var suspendFunGateway: SuspendFunGateway
fun someServiceMethod() {
runBlocking {
val reply = suspendFunGateway.suspendGateway("test suspend gateway")
}
}