消息路由
消息路由
本章介绍了使用 Spring Integration 路由消息的详细信息。
路由器
本节介绍路由器的工作原理。 它包括以下主题:
概述
路由器是许多消息传递体系结构中的关键元素。 它们使用来自消息通道的消息,并根据一组条件将每条使用的消息转发到一个或多个不同的消息通道。
Spring 集成提供了以下路由器:
Router 实现共享许多 configuration 参数。 但是,路由器之间存在某些差异。 此外,配置参数的可用性取决于 router 是在链内部还是外部使用。 为了提供快速概述,以下两个表中列出了所有可用属性 。
下表显示了可用于链外路由器的配置参数:
属性 | 路由器 | 标头值路由器 | XPath 路由器 | 有效载荷类型 Router | 收件人列表路由 | 异常类型 router |
---|---|---|---|---|---|---|
应用序列 |
||||||
默认输出通道 |
||||||
需要分辨率 |
||||||
忽略发送失败 |
||||||
超时 |
||||||
身份证 |
||||||
自动启动 |
||||||
input-channel (输入通道) |
||||||
次序 |
||||||
方法 |
||||||
裁判 |
||||||
表达 |
||||||
标头名称 |
||||||
计算为字符串 |
||||||
xpath-expression-ref 表达式 |
||||||
转炉 |
下表显示了可用于链内路由器的配置参数:
属性 | 路由器 | 标头值路由器 | XPath 路由器 | 有效载荷类型 Router | 收件人列表路由器 | 异常类型 router |
---|---|---|---|---|---|---|
应用序列 |
||||||
默认输出通道 |
||||||
需要分辨率 |
||||||
忽略发送失败 |
||||||
超时 |
||||||
身份证 |
||||||
自动启动 |
||||||
input-channel (输入通道) |
||||||
次序 |
||||||
方法 |
||||||
裁判 |
||||||
表达 |
||||||
标头名称 |
||||||
计算为字符串 |
||||||
xpath-expression-ref 表达式 |
||||||
转炉 |
从 Spring Integration 2.1 开始,路由器参数在所有路由器实现中都更加标准化。 因此,一些小的更改可能会破坏基于 Spring Integration 的旧应用程序。 从 Spring Integration 2.1 开始,该属性被删除,以便将其行为与该属性合并。
此外,该属性现在默认为 . 在这些更改之前,该属性默认为 ,导致在未解析通道且未设置 no 时以静默方式删除消息。
新行为需要至少一个已解析的通道,默认情况下,如果未确定通道(或发送尝试不成功),则引发一个。 如果您确实希望以静默方式发送消息,则可以设置 . |
常用路由器参数
本节介绍所有路由器参数的通用参数(在本章前面显示的两个表中勾选了所有框的参数)。
链内部和外部
以下参数对链内外的所有路由器都有效。
apply-sequence
-
此属性指定是否应将 sequence number 和 size 标头添加到每条消息中。 此可选属性默认为 .
false
default-output-channel
-
如果设置,则此属性将提供对通道的引用,如果通道解析无法返回任何通道,则应将消息发送到该通道。 如果未提供默认 output channel,则 router 将引发异常。 如果要以静默方式删除这些消息,请将默认 output channel 属性值设置为 。
nullChannel
从版本 6.0 开始,设置默认输出通道也会将选项重置为 。 因此,不会尝试从通道的名称解析通道,而是回退到这个默认输出通道 - 类似于 Java 语句。 如果设置为 explicly,则进一步的逻辑取决于选项: message to un-resolved channel from key can reach a only if if (从键到未解析的通道的消息) 才能到达 a 。 因此,提供且两者均设置为的配置被初始化阶段拒绝。 channelKeyFallback
false
switch
channelKeyFallback
true
resolutionRequired
defaultOutputChannel
resolutionRequired
false
defaultOutputChannel
channelKeyFallback
resolutionRequired
true
AbstractMappingMessageRouter
resolution-required
-
此属性指定是否必须始终将通道名称成功解析为存在的通道实例。 如果设置为 ,则在无法解析通道时引发 a 。 将此属性设置为会导致忽略任何无法解析的通道。 此可选属性默认为 .
true
MessagingException
false
true
A Message is sent only to , if 指定了, when is 且未解析通道。 default-output-channel
resolution-required
false
ignore-send-failures
-
如果设置为 ,则忽略发送到消息通道的失败。 如果设置为 ,则抛出 a,并且如果路由器解析多个通道,则任何后续通道都不会收到该消息。
true
false
MessageDeliveryException
此属性的确切行为取决于消息发送到的类型。 例如,当使用直接通道(单线程)时,发送失败可能是由更远的下游组件抛出的异常引起的。 但是,当将消息发送到简单的队列通道 (异步) 时,引发异常的可能性相当小。
Channel
虽然大多数路由器路由到单个通道,但它们可以返回多个通道名称。 例如,这正是这样做的。 如果在仅路由到单个通道的路由器上将此属性设置为,则任何引起的异常都会被吞噬,这通常没有什么意义。 在这种情况下,最好在流入口点捕获错误流中的异常。 因此,当路由器实现返回多个通道名称时,将属性设置为通常更有意义,因为失败的通道之后的其他通道仍将收到该消息。 recipient-list-router
true
ignore-send-failures
true
此属性默认为 .
false
timeout
-
该属性指定在向目标 Message Channel 发送消息时等待的最长时间(以毫秒为单位)。 默认情况下,send 操作无限期阻止。
timeout
顶层(链外)
以下参数仅对链之外的所有顶级路由器有效。
id
-
标识底层 Spring bean 定义,在路由器的情况下,它是 或 的实例,具体取决于路由器的实例分别是 a 还是 a 。 这是一个可选属性。
EventDrivenConsumer
PollingConsumer
input-channel
SubscribableChannel
PollableChannel
auto-startup
-
此 “lifecycle” 属性指示是否应在应用程序上下文启动期间启动此组件。 此可选属性默认为 .
true
input-channel
-
此终端节点的接收消息通道。
order
-
此属性定义当此终端节点作为订阅者连接到通道时的调用顺序。 当该通道使用 failover dispatching 策略时,这一点尤其重要。 当此终端节点本身是具有队列的通道的轮询使用者时,它不起作用。
路由器实现
由于基于内容的路由通常需要一些特定于域的逻辑,因此大多数用例都需要 Spring 集成的选项,通过使用 XML 名称空间支持或注释来委托给 POJO。 这两者都将在后面讨论。 但是,我们首先介绍几个满足常见要求的实现。
PayloadTypeRouter
A 将消息发送到由有效负载类型映射定义的通道,如下例所示:PayloadTypeRouter
<bean id="payloadTypeRouter"
class="org.springframework.integration.router.PayloadTypeRouter">
<property name="channelMapping">
<map>
<entry key="java.lang.String" value-ref="stringChannel"/>
<entry key="java.lang.Integer" value-ref="integerChannel"/>
</map>
</property>
</bean>
Spring Integration 提供的命名空间也支持配置(参见),它通过将配置及其相应的实现(通过使用元素定义)组合成一个更简洁的配置元素,从根本上简化了配置。
以下示例显示了一个等效于上述配置但使用命名空间支持的配置:PayloadTypeRouter
Namespace Support
<router/>
<bean/>
PayloadTypeRouter
<int:payload-type-router input-channel="routingChannel">
<int:mapping type="java.lang.String" channel="stringChannel" />
<int:mapping type="java.lang.Integer" channel="integerChannel" />
</int:payload-type-router>
以下示例显示了在 Java 中配置的等效路由器:
@ServiceActivator(inputChannel = "routingChannel")
@Bean
public PayloadTypeRouter router() {
PayloadTypeRouter router = new PayloadTypeRouter();
router.setChannelMapping(String.class.getName(), "stringChannel");
router.setChannelMapping(Integer.class.getName(), "integerChannel");
return router;
}
使用 Java DSL 时,有两个选项。
首先,您可以定义 router 对象,如前面的示例所示:
@Bean
public IntegrationFlow routerFlow1() {
return IntegrationFlow.from("routingChannel")
.route(router())
.get();
}
public PayloadTypeRouter router() {
PayloadTypeRouter router = new PayloadTypeRouter();
router.setChannelMapping(String.class.getName(), "stringChannel");
router.setChannelMapping(Integer.class.getName(), "integerChannel");
return router;
}
请注意,路由器可以是 .
如果它不是 .@Bean
@Bean
其次,您可以在 DSL 流本身中定义路由函数,如下例所示:
@Bean
public IntegrationFlow routerFlow2() {
return IntegrationFlow.from("routingChannel")
.<Object, Class<?>>route(Object::getClass, m -> m
.channelMapping(String.class, "stringChannel")
.channelMapping(Integer.class, "integerChannel"))
.get();
}
HeaderValueRouter
A 根据各个标头值映射将 Messages 发送到通道。
创建 a 时,将使用要评估的标头的名称对其进行初始化。
标头的值可以是以下两项之一:HeaderValueRouter
HeaderValueRouter
-
任意值
-
频道名称
如果它是任意值,则需要将这些 Headers 值附加到通道名称的其他映射。 否则,无需其他配置。
Spring 集成提供了一个简单的基于名称空间的 XML 配置来配置一个 .
以下示例演示了当需要将 Headers 值映射到 channels 时的配置:HeaderValueRouter
HeaderValueRouter
<int:header-value-router input-channel="routingChannel" header-name="testHeader">
<int:mapping value="someHeaderValue" channel="channelA" />
<int:mapping value="someOtherHeaderValue" channel="channelB" />
</int:header-value-router>
在解析过程中,上述示例中定义的路由器可能会遇到通道解析失败,从而导致异常。
如果要禁止显示此类异常并将未解析的消息发送到默认输出通道(由属性设置为标识),请设置为 。default-output-channel
resolution-required
false
通常,标头值未显式映射到通道的消息将发送到 。
但是,当 Headers 值映射到通道名称但无法解析通道时,将属性设置为会导致将此类消息路由到 .default-output-channel
resolution-required
false
default-output-channel
从 Spring Integration 2.1 开始,该属性已从 更改为 。
属性默认为 。ignore-channel-name-resolution-failures resolution-required resolution-required true |
以下示例显示了在 Java 中配置的等效路由器:
@ServiceActivator(inputChannel = "routingChannel")
@Bean
public HeaderValueRouter router() {
HeaderValueRouter router = new HeaderValueRouter("testHeader");
router.setChannelMapping("someHeaderValue", "channelA");
router.setChannelMapping("someOtherHeaderValue", "channelB");
return router;
}
使用 Java DSL 时,有两个选项。 首先,您可以定义 router 对象,如前面的示例所示:
@Bean
public IntegrationFlow routerFlow1() {
return IntegrationFlow.from("routingChannel")
.route(router())
.get();
}
public HeaderValueRouter router() {
HeaderValueRouter router = new HeaderValueRouter("testHeader");
router.setChannelMapping("someHeaderValue", "channelA");
router.setChannelMapping("someOtherHeaderValue", "channelB");
return router;
}
请注意,路由器可以是 .
如果它不是 .@Bean
@Bean
其次,您可以在 DSL 流本身中定义路由函数,如下例所示:
@Bean
public IntegrationFlow routerFlow2() {
return IntegrationFlow.from("routingChannel")
.route(Message.class, m -> m.getHeaders().get("testHeader", String.class),
m -> m
.channelMapping("someHeaderValue", "channelA")
.channelMapping("someOtherHeaderValue", "channelB"),
e -> e.id("headerValueRouter"))
.get();
}
不需要将 Headers 值映射到通道名称的配置,因为 Headers 值本身表示通道名称。 以下示例显示了一个不需要将 Headers 值映射到通道名称的路由器:
<int:header-value-router input-channel="routingChannel" header-name="testHeader"/>
从 Spring Integration 2.1 开始,解析通道的行为更加明确。
例如,如果省略该属性,则路由器无法解析至少一个有效通道,并且通过设置为 忽略任何通道名称解析失败,然后抛出 a。 基本上,默认情况下,路由器必须能够成功地将消息路由到至少一个通道。
如果您确实要删除消息,还必须设置为 。 |
RecipientListRouter
A 将收到的每条消息发送到静态定义的消息通道列表。
以下示例创建一个 :RecipientListRouter
RecipientListRouter
<bean id="recipientListRouter"
class="org.springframework.integration.router.RecipientListRouter">
<property name="channels">
<list>
<ref bean="channel1"/>
<ref bean="channel2"/>
<ref bean="channel3"/>
</list>
</property>
</bean>
Spring 集成还为配置提供了名称空间支持(参见名称空间支持),如下例所示:RecipientListRouter
<int:recipient-list-router id="customRouter" input-channel="routingChannel"
timeout="1234"
ignore-send-failures="true"
apply-sequence="true">
<int:recipient channel="channel1"/>
<int:recipient channel="channel2"/>
</int:recipient-list-router>
以下示例显示了在 Java 中配置的等效路由器:
@ServiceActivator(inputChannel = "routingChannel")
@Bean
public RecipientListRouter router() {
RecipientListRouter router = new RecipientListRouter();
router.setSendTimeout(1_234L);
router.setIgnoreSendFailures(true);
router.setApplySequence(true);
router.addRecipient("channel1");
router.addRecipient("channel2");
router.addRecipient("channel3");
return router;
}
以下示例显示了使用 Java DSL 配置的等效路由器:
@Bean
public IntegrationFlow routerFlow() {
return IntegrationFlow.from("routingChannel")
.routeToRecipients(r -> r
.applySequence(true)
.ignoreSendFailures(true)
.recipient("channel1")
.recipient("channel2")
.recipient("channel3")
.sendTimeout(1_234L))
.get();
}
此处的 'apply-sequence' 标志与它对 publish-subscribe-channel 的作用相同,并且与 publish-subscribe-channel 一样,它在 .
有关更多信息,请参阅 PublishSubscribeChannel 配置。recipient-list-router |
配置 时的另一个便捷选项是使用 Spring 表达式语言 (SpEL) 支持作为单个收件人通道的选择器。
这样做类似于在 'chain' 的开头使用 filter 来充当 “selective consumer”。
但是,在这种情况下,它全部相当简洁地组合到路由器的配置中,如下例所示:RecipientListRouter
<int:recipient-list-router id="customRouter" input-channel="routingChannel">
<int:recipient channel="channel1" selector-expression="payload.equals('foo')"/>
<int:recipient channel="channel2" selector-expression="headers.containsKey('bar')"/>
</int:recipient-list-router>
在前面的配置中,将评估由属性标识的 SPEL 表达式,以确定是否应将此收件人包含在给定 input 消息的收件人列表中。
表达式的求值结果必须为 。
如果未定义此属性,则渠道始终位于收件人列表中。selector-expression
boolean
RecipientListRouterManagement
从版本 4.1 开始,提供了几个操作来在运行时动态操作收件人。
这些管理操作是通过 annotation 呈现的。
它们可以通过使用 Control Bus 和 JMX 来使用,如下例所示:RecipientListRouter
RecipientListRouterManagement
@ManagedResource
<control-bus input-channel="controlBus"/>
<recipient-list-router id="simpleRouter" input-channel="routingChannelA">
<recipient channel="channel1"/>
</recipient-list-router>
<channel id="channel2"/>
messagingTemplate.convertAndSend(controlBus, "@'simpleRouter.handler'.addRecipient('channel2')");
从应用程序启动 中,只有一个收件人。
但在命令之后,将添加 recipient。
这是一个 “注册对消息中的一部分的兴趣 ”的用例,当我们可能在某个时间段对来自路由器的消息感兴趣时,因此我们正在订阅 ,并在某个时候决定取消订阅。simpleRouter
channel1
addRecipient
channel2
recipient-list-router
由于 的运行时管理操作 ,它可以从一开始就配置,而无需任何操作。
在这种情况下,当消息没有一个匹配的收件人时,其行为是相同的。
如果已配置,则消息将发送到该处。
否则 the 将被抛出。<recipient-list-router>
<recipient>
RecipientListRouter
defaultOutputChannel
MessageDeliveryException
XPath 路由器
XPath Router 是 XML Module 的一部分。 请参见使用 XPath 路由 XML 消息。
路由和错误处理
Spring 集成还提供了一个特殊的基于类型的路由器,称为路由错误消息(定义为其实例的消息)。 类似于 .
事实上,它们几乎相同。
唯一的区别是,在导航有效负载实例的实例层次结构(例如)以查找最具体的类型和通道映射时,导航“异常原因”(例如)的层次结构以查找最具体的类型或通道映射,并用于匹配类或任何超类。ErrorMessageExceptionTypeRouter
payload
Throwable
ErrorMessageExceptionTypeRouter
PayloadTypeRouter
PayloadTypeRouter
payload.getClass().getSuperclass()
ErrorMessageExceptionTypeRouter
payload.getCause()
Throwable
mappingClass.isInstance(cause)
cause
在这种情况下,通道映射顺序很重要。
因此,如果需要获取 , 而不是 的映射,则必须先在路由器上配置最后一个映射。IllegalArgumentException RuntimeException |
从版本 4.3 开始,在初始化阶段加载所有映射类,以便快速失败。ErrorMessageExceptionTypeRouter ClassNotFoundException |
以下示例显示了 的示例配置:ErrorMessageExceptionTypeRouter
<int:exception-type-router input-channel="inputChannel"
default-output-channel="defaultChannel">
<int:mapping exception-type="java.lang.IllegalArgumentException"
channel="illegalChannel"/>
<int:mapping exception-type="java.lang.NullPointerException"
channel="npeChannel"/>
</int:exception-type-router>
<int:channel id="illegalChannel" />
<int:channel id="npeChannel" />
配置通用路由器
Spring 集成提供了一个通用路由器。 你可以将它用于通用路由(与 Spring Integration 提供的其他路由器相反,每个路由器都有某种形式的专用化)。
使用 XML 配置基于内容的路由器
该元素提供了一种将 router 连接到 input 通道的方法,并且还接受 optional 属性。
该属性引用自定义路由器实现的 bean 名称(必须扩展)。
以下示例显示了三个通用路由器:router
default-output-channel
ref
AbstractMessageRouter
<int:router ref="payloadTypeRouter" input-channel="input1"
default-output-channel="defaultOutput1"/>
<int:router ref="recipientListRouter" input-channel="input2"
default-output-channel="defaultOutput2"/>
<int:router ref="customRouter" input-channel="input3"
default-output-channel="defaultOutput3"/>
<beans:bean id="customRouterBean" class="org.foo.MyCustomRouter"/>
或者,可以指向包含 Comments 的 POJO(稍后显示),或者你可以将 与显式方法名称组合在一起。
指定方法将应用本文档后面的 annotation 部分中描述的相同行为。
下面的示例定义了一个在其属性中指向 POJO 的路由器:ref
@Router
ref
@Router
ref
<int:router input-channel="input" ref="somePojo" method="someMethod"/>
如果自定义路由器实现在其他定义中引用,我们通常建议使用属性。
但是,如果自定义路由器实现的范围应限定为单个定义的定义,则可以提供内部 Bean 定义,如下例所示:ref
<router>
<router>
<int:router method="someMethod" input-channel="input3"
default-output-channel="defaultOutput3">
<beans:bean class="org.foo.MyCustomRouter"/>
</int:router>
不允许在同一配置中同时使用 attribute 和 inner handler definition。
这样做会产生不明确的条件并引发异常。ref <router> |
如果该属性引用了扩展的 Bean(例如框架本身提供的路由器),则配置将优化为直接引用路由器。
在这种情况下,每个属性必须引用单独的 bean 实例(或范围的 bean)或使用内部配置类型。
但是,仅当未在路由器 XML 定义中提供任何特定于路由器的属性时,此优化才适用。
如果您无意中从多个 bean 引用了相同的消息处理程序,则会收到配置异常。ref AbstractMessageProducingHandler ref prototype <bean/> |
以下示例显示了在 Java 中配置的等效路由器:
@Bean
@Router(inputChannel = "routingChannel")
public AbstractMessageRouter myCustomRouter() {
return new AbstractMessageRouter() {
@Override
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
return // determine channel(s) for message
}
};
}
以下示例显示了使用 Java DSL 配置的等效路由器:
@Bean
public IntegrationFlow routerFlow() {
return IntegrationFlow.from("routingChannel")
.route(myCustomRouter())
.get();
}
public AbstractMessageRouter myCustomRouter() {
return new AbstractMessageRouter() {
@Override
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
return // determine channel(s) for message
}
};
}
或者,您可以路由来自消息负载的数据,如下例所示:
@Bean
public IntegrationFlow routerFlow() {
return IntegrationFlow.from("routingChannel")
.route(String.class, p -> p.contains("foo") ? "fooChannel" : "barChannel")
.get();
}
路由器和 Spring 表达式语言 (SpEL)
有时,路由逻辑可能很简单,为它编写一个单独的类并将其配置为 bean 似乎有点矫枉过正。 从 Spring Integration 2.0 开始,我们提供了一种替代方案,允许您使用 SPEL 来实现以前需要自定义 POJO 路由器的简单计算。
有关 Spring 表达式语言的更多信息,请参见 Spring Framework Reference Guide 中的相关章节。 |
通常,会评估 SPEL 表达式,并将其结果映射到通道,如下例所示:
<int:router input-channel="inChannel" expression="payload.paymentType">
<int:mapping value="CASH" channel="cashPaymentChannel"/>
<int:mapping value="CREDIT" channel="authorizePaymentChannel"/>
<int:mapping value="DEBIT" channel="authorizePaymentChannel"/>
</int:router>
以下示例显示了在 Java 中配置的等效路由器:
@Router(inputChannel = "routingChannel")
@Bean
public ExpressionEvaluatingRouter router() {
ExpressionEvaluatingRouter router = new ExpressionEvaluatingRouter("payload.paymentType");
router.setChannelMapping("CASH", "cashPaymentChannel");
router.setChannelMapping("CREDIT", "authorizePaymentChannel");
router.setChannelMapping("DEBIT", "authorizePaymentChannel");
return router;
}
以下示例显示了在 Java DSL 中配置的等效路由器:
@Bean
public IntegrationFlow routerFlow() {
return IntegrationFlow.from("routingChannel")
.route("payload.paymentType", r -> r
.channelMapping("CASH", "cashPaymentChannel")
.channelMapping("CREDIT", "authorizePaymentChannel")
.channelMapping("DEBIT", "authorizePaymentChannel"))
.get();
}
为了进一步简化,SpEL 表达式可以计算为通道名称,如下面的表达式所示:
<int:router input-channel="inChannel" expression="payload + 'Channel'"/>
在前面的配置中,结果通道由 SPEL 表达式计算,该表达式将 the 的值与文字 'Channel' 连接起来。payload
String
SPEL 用于配置路由器的另一个优点是表达式可以返回一个,有效地使每个收件人都列出路由器。
每当表达式返回多个通道值时,消息就会转发到每个通道。
以下示例显示了这样的表达式:Collection
<router>
<int:router input-channel="inChannel" expression="headers.channels"/>
在上述配置中,如果消息包含名称为 'channels' 的报头,并且该报头的值是 channel names,则消息将发送到列表中的每个通道。
当您需要选择多个渠道时,您可能还会发现集合投影和集合选择表达式很有用。
有关详细信息,请参阅:List
使用注释配置路由器
当用于注释方法时,该方法可能返回 a 或 type。
在后一种情况下,终端节点解析通道名称的方式与解析默认输出通道的方式相同。
此外,该方法可能返回单个值或集合。
如果返回集合,则回复消息将发送到多个渠道。
总而言之,以下方法签名都是有效的:@Router
MessageChannel
String
@Router
public MessageChannel route(Message message) {...}
@Router
public List<MessageChannel> route(Message message) {...}
@Router
public String route(Foo payload) {...}
@Router
public List<String> route(Foo payload) {...}
除了基于有效负载的路由之外,还可以根据消息标头中作为属性或属性提供的元数据来路由消息。
在这种情况下,带注释的方法可能包括一个带 注释的参数,该参数映射到标头值,如以下示例所示,并在注释支持中记录:@Router
@Header
@Router
public List<String> route(@Header("orderStatus") OrderStatus status)
有关基于 XML 的消息的路由 (包括 XPath 支持),请参阅 XML 支持 - 处理 XML 有效负载。 |
有关路由器配置的更多信息,另请参阅 Java DSL 一章中的消息路由器。
动态路由器
Spring 集成为常见的基于内容的路由用例提供了相当多的不同路由器配置,以及将自定义路由器实现为 POJO 的选项。
例如,提供了一种简单的方法来配置路由器,该路由器根据传入消息的有效负载类型计算通道,同时在配置通过评估特定消息 Header 的值来计算通道的路由器时提供相同的便利。
还有基于表达式 (SpEL) 的路由器,其中通道是根据计算表达式来确定的。
所有这些类型的路由器都表现出一些动态特性。PayloadTypeRouter
HeaderValueRouter
但是,这些路由器都需要静态配置。 即使在基于表达式的路由器的情况下,表达式本身也被定义为路由器配置的一部分,这意味着对相同值进行操作的相同表达式始终会导致相同通道的计算。 这在大多数情况下是可以接受的,因为此类路由定义明确,因此是可预测的。 但有时我们需要动态地更改路由器配置,以便消息流可以路由到不同的通道。
例如,您可能希望关闭系统的某些部分以进行维护,并临时将消息重新路由到其他消息流。
再举一个例子,您可能希望通过添加另一个路由来处理更具体的类型(在 的情况下),为消息流引入更多的粒度。java.lang.Number
PayloadTypeRouter
不幸的是,使用静态路由器配置来实现这两个目标中的任何一个,您将不得不关闭整个应用程序,更改路由器的配置(更改路由),然后重新启动应用程序。 这显然不是任何人都想要的解决方案。
动态路由器模式描述了一种机制,通过这些机制,您可以动态地更改或配置路由器,而不会关闭系统或单个路由器。
在我们深入了解 Spring Integration 如何支持动态路由的细节之前,我们需要考虑路由器的典型流程:
-
计算通道标识符,这是路由器在收到消息后计算的值。 通常,它是一个 String 或实际的实例。
MessageChannel
-
将通道标识符解析为通道名称。 我们将在本节后面介绍此过程的细节。
-
将频道名称解析为实际的
MessageChannel
如果步骤 1 导致 的实际实例,则动态路由方面无能为力,因为这是任何路由器工作的最终产品。
但是,如果第一步生成的通道标识符不是 的实例,则有很多可能的方法可以影响派生 .
请考虑以下有效负载类型 router 的示例:MessageChannel
MessageChannel
MessageChannel
MessageChannel
<int:payload-type-router input-channel="routingChannel">
<int:mapping type="java.lang.String" channel="channel1" />
<int:mapping type="java.lang.Integer" channel="channel2" />
</int:payload-type-router>
在有效载荷类型路由器的上下文中,前面提到的三个步骤将按以下方式实现:
-
计算一个通道标识符,该标识符是负载类型的完全限定名称(例如 )。
java.lang.String
-
将通道标识符解析为通道名称,其中上一步的结果用于从元素中定义的有效负载类型映射中选择适当的值。
mapping
-
将通道名称解析为实际实例 的 ,作为对应用程序上下文中 bean 的引用(希望是 a ),由上一步的结果标识。
MessageChannel
MessageChannel
换句话说,每个步骤都会为下一步提供数据,直到该过程完成。
现在考虑一个 header value router 的示例:
<int:header-value-router input-channel="inputChannel" header-name="testHeader">
<int:mapping value="foo" channel="fooChannel" />
<int:mapping value="bar" channel="barChannel" />
</int:header-value-router>
现在我们可以考虑 header value router 的三个步骤是如何工作的:
-
计算一个通道标识符,该标识符是由属性标识的 Header 的值。
header-name
-
将 channel identifier 解析为 channel name,其中上一步的结果用于从元素中定义的常规映射中选择适当的值。
mapping
-
将通道名称解析为实际实例 的 ,作为对应用程序上下文中 bean 的引用(希望是 a ),由上一步的结果标识。
MessageChannel
MessageChannel
两种不同路由器类型的前两种配置看起来几乎相同。
但是,如果您查看 的备用配置,我们会清楚地看到没有 sub 元素,如下面的清单所示:HeaderValueRouter
mapping
<int:header-value-router input-channel="inputChannel" header-name="testHeader">
但是,该配置仍然完全有效。 那么自然而然的问题是,第二步中的映射呢?
第二步现在是可选的。
如果未定义,则在第一步中计算的通道标识符值将自动被视为 ,现在解析为实际 ,如第三步中所示。
它还意味着第二步是向路由器提供动态特性的关键步骤之一,因为它引入了一个过程,允许您更改通道标识符解析为通道名称的方式,从而影响从初始通道标识符确定 the 的最终实例的过程。mapping
channel name
MessageChannel
MessageChannel
例如,在前面的配置中,假设值为 'kermit',它现在是通道标识符(第一步)。
由于此路由器中没有映射,因此无法将此通道标识符解析为通道名称(第二步),并且此通道标识符现在被视为通道名称。
但是,如果存在映射但值不同,该怎么办?
最终结果仍然是相同的,因为如果无法通过将通道标识符解析为通道名称来确定新值,则通道标识符将成为通道名称。testHeader
剩下的就是第三步将通道名称 ('kermit') 解析为由此名称标识的实际实例。
这基本上涉及对提供的名称的 bean 查找。
现在,所有包含 header-value 对的消息都将被路由到其 bean 名称(其 )为 'kermit' 的 bean。MessageChannel
testHeader=kermit
MessageChannel
id
但是,如果您想将这些消息路由到 'simpson' 通道怎么办?显然,更改静态配置是可行的,但这样做也需要关闭您的系统。
但是,如果您有权访问通道标识符映射,则可以在 header-value 对现在的位置引入一个新的映射,从而让第二步将 'kermit' 视为通道标识符,同时将其解析为 'simpson' 作为通道名称。kermit=simpson
这显然同样适用于 ,您现在可以重新映射或删除特定的负载类型映射。
事实上,它适用于所有其他路由器,包括基于表达式的路由器,因为它们的计算值现在有机会通过第二步解析为实际的 .PayloadTypeRouter
channel name
任何作为 (包括大多数框架定义的路由器) 的子类的路由器都是动态路由器,因为 是在 级别定义的。
该地图的 setter 方法与 'setChannelMapping' 和 'removeChannelMapping' 方法一起作为公共方法公开。
这些允许您在运行时更改、添加和删除路由器映射,只要您有对路由器本身的引用。
这也意味着你可以通过 JMX(参见 JMX 支持)或 Spring 集成控制总线(参见控制总线)功能公开这些相同的配置选项。AbstractMappingMessageRouter
channelMapping
AbstractMappingMessageRouter
回退到 channel key 作为频道名称,灵活方便。
但是,如果您不信任消息创建者,恶意行为者(了解系统的人)可能会创建路由到意外通道的消息。
例如,如果将 key 设置为路由器 input 通道的通道名称,则此类消息将被路由回路由器,最终导致堆栈溢出错误。
因此,您可能希望禁用此功能(将属性设置为 ),并在需要时更改映射。channelKeyFallback false |
使用 Control Bus 管理 Router Mapping
管理路由器映射的一种方法是通过控制总线模式,它公开了一个控制通道,你可以向该通道发送控制消息以管理和监视 Spring 集成组件,包括路由器。
有关控制总线的更多信息,请参阅Control Bus。 |
通常,您将发送一条控制消息,要求对特定托管组件(例如路由器)调用特定操作。 以下托管操作(方法)特定于更改路由器解析过程:
-
public void setChannelMapping(String key, String channelName)
:用于在 和 之间添加新映射或修改现有映射channel identifier
channel name
-
public void removeChannelMapping(String key)
:用于删除特定通道映射,从而断开 和 之间的关系channel identifier
channel name
请注意,这些方法可用于简单的更改(例如更新单个路由或添加或删除路由)。 但是,如果要删除一个路由并添加另一个路由,则更新不是原子的。 这意味着路由表在更新之间可能处于不确定状态。 从版本 4.0 开始,您现在可以使用 control bus 以原子方式更新整个 routing table。 以下方法允许您执行此操作:
-
public Map<String, String>getChannelMappings()
:返回当前映射。 -
public void replaceChannelMappings(Properties channelMappings)
:更新映射。 请注意,该参数是一个对象。 这种安排允许 control bus 命令使用内置的 ,如下例所示:channelMappings
Properties
StringToPropertiesConverter
"@'router.handler'.replaceChannelMappings('foo=qux \n baz=bar')"
请注意,每个映射都由换行符 () 分隔。
对于映射的编程更改,出于类型安全考虑,我们建议您使用该方法。 忽略不是对象的键或值。\n
setChannelMappings
replaceChannelMappings
String
使用 JMX 管理路由器映射
您还可以使用 Spring 的 JMX 支持来公开路由器实例,然后使用您最喜欢的 JMX 客户端(例如,JConsole)来管理用于更改路由器配置的那些操作(方法)。
有关 Spring 集成的 JMX 支持的更多信息,请参阅 JMX 支持。 |
工艺路线单
从版本 4.1 开始, Spring 集成提供了路由单企业集成模式的实现。
它作为消息标头实现,当未为终端节点指定 an 时,该消息头用于确定实例中的下一个通道。
此模式在复杂的动态情况下非常有用,此时很难配置多个路由器来确定消息流。
当消息到达没有 的终端节点时,将查询 以确定将消息发送到的下一个通道。
当路由单用尽时,恢复正常处理。routingSlip
AbstractMessageProducingHandler
outputChannel
output-channel
routingSlip
replyChannel
路由单的配置显示为一个选项 — 包含条目的分号分隔的路由单,如下例所示:HeaderEnricher
path
<util:properties id="properties">
<beans:prop key="myRoutePath1">channel1</beans:prop>
<beans:prop key="myRoutePath2">request.headers[myRoutingSlipChannel]</beans:prop>
</util:properties>
<context:property-placeholder properties-ref="properties"/>
<header-enricher input-channel="input" output-channel="process">
<routing-slip
value="${myRoutePath1}; @routingSlipRoutingPojo.get(request, reply);
routingSlipRoutingStrategy; ${myRoutePath2}; finishChannel"/>
</header-enricher>
前面的示例具有:
-
一种配置,用于演示路由单中的条目可以指定为可解析键。
<context:property-placeholder>
path
-
子元素用于填充 到 处理程序。
<header-enricher>
<routing-slip>
RoutingSlipHeaderValueMessageProcessor
HeaderEnricher
-
接受已解析的路由单条目数组,并返回 (from) a 和 as initial。
RoutingSlipHeaderValueMessageProcessor
String
path
processMessage()
singletonMap
path
key
0
routingSlipIndex
路由单条目可以包含 Bean 名称、Bean 名称和 Spring 表达式 (SPEL)。
根据第一次调用检查每个路由单条目。
它将条目(在应用程序上下文中不是 bean 名称)转换为实例。 条目被多次调用,直到它们返回 null 或 empty 。path
MessageChannel
RoutingSlipRouteStrategy
RoutingSlipHeaderValueMessageProcessor
path
BeanFactory
processMessage
ExpressionEvaluatingRoutingSlipRouteStrategy
RoutingSlipRouteStrategy
String
由于流程中涉及到路由单,因此我们有一个请求-回复上下文。
已引入 the 来确定使用 the 和 对象的下一个。
此策略的实现应在应用程序上下文中注册为 bean,并且其 bean 名称在 routing slip 中使用。
提供了实现。
它接受 SPEL 表达式,并将内部对象用作评估上下文的根对象。
这是为了避免每次调用的创建开销。
它是一个简单的 Java Bean,具有两个属性: 和 。
通过这个表达式实现,我们可以使用 SpEL(例如 和 )来指定路由单条目,并避免为 .getOutputChannel
RoutingSlipRouteStrategy
outputChannel
requestMessage
reply
path
ExpressionEvaluatingRoutingSlipRouteStrategy
ExpressionEvaluatingRoutingSlipRouteStrategy.RequestAndReply
EvaluationContext
ExpressionEvaluatingRoutingSlipRouteStrategy.getNextPath()
Message<?> request
Object reply
path
@routingSlipRoutingPojo.get(request, reply)
request.headers[myRoutingSlipChannel]
RoutingSlipRouteStrategy
参数始终为 .
根据上下文,回复对象可以是 a 、 an 或任意应用程序域对象(例如,当它由服务激活器调用的 POJO 方法返回时)。
在前两种情况下,使用 SPEL(或 Java 实现)时,通常的属性( 和 )可用。
对于任意域对象,这些属性不可用。
因此,如果结果用于确定下一个路径,则在将路由单与 POJO 方法结合使用时要小心。requestMessage Message<?> Message<?> AbstractIntegrationMessageBuilder Message payload headers |
如果分布式环境中涉及路由单,我们建议不要对 Routing Slip 使用内联表达式。
此建议适用于分布式环境,例如跨 JVM 应用程序,通过消息代理(例如AMQP 支持或 JMS 支持)或在集成流中使用持久性(消息存储)。
框架用于将它们转换为对象,并在消息标头中使用它们。
由于这个类不是(它不可能是,因为它依赖于 ),整个变得不可序列化,并且在任何分布式操作中,我们最终都会得到一个 .
要克服此限制,请使用所需的 SPEL 注册一个 Bean,并在路由单配置中使用其 Bean 名称。path request-reply MessageStore RoutingSlipHeaderValueMessageProcessor ExpressionEvaluatingRoutingSlipRouteStrategy routingSlip Serializable BeanFactory Message NotSerializableException ExpressionEvaluatingRoutingSlipRouteStrategy path |
对于 Java 配置,你可以将实例添加到 Bean 定义中,如下例所示:RoutingSlipHeaderValueMessageProcessor
HeaderEnricher
@Bean
@Transformer(inputChannel = "routingSlipHeaderChannel")
public HeaderEnricher headerEnricher() {
return new HeaderEnricher(Collections.singletonMap(IntegrationMessageHeaderAccessor.ROUTING_SLIP,
new RoutingSlipHeaderValueMessageProcessor("myRoutePath1",
"@routingSlipRoutingPojo.get(request, reply)",
"routingSlipRoutingStrategy",
"request.headers[myRoutingSlipChannel]",
"finishChannel")));
}
当终端节点生成回复且未定义 no 时,路由滑移算法的工作原理如下:outputChannel
-
这用于从路由单列表中获取值。
routingSlipIndex
path
-
如果 from 的值为 ,则用于从 获取 bean 。
routingSlipIndex
String
BeanFactory
-
如果返回的 Bean 是 的实例,则将其用作下一个实例,并且在回复消息头中 将递增(路由单条目保持不变)。
MessageChannel
outputChannel
routingSlipIndex
path
-
如果返回的 bean 是 的实例,并且它不返回 empty ,则该结果将用作下一个 的 bean 名称。 保持不变。
RoutingSlipRouteStrategy
getNextPath
String
outputChannel
routingSlipIndex
-
如果返回空 或 ,则递增 ,并递归调用下一个 Routing Slip 项。
RoutingSlipRouteStrategy.getNextPath
String
null
routingSlipIndex
getOutputChannelFromRoutingSlip
path
-
如果下一个路由单条目不是 ,则它必须是 的实例。
path
String
RoutingSlipRouteStrategy
-
当 超过路由单列表的大小时,算法将移动到标准标头的默认行为。
routingSlipIndex
path
replyChannel
Process Manager 企业集成模式
企业集成模式包括 Process Manager 模式。
您现在可以通过使用封装在 routing slip 中的自定义 process manager logic 来轻松实现此模式。
除了 bean 名称之外,还可以返回任何对象,并且不要求此实例是应用程序上下文中的 bean。
这样,当无法预测应该使用哪个 channel 时,我们可以提供强大的 dynamic routing logic。
可以在 中创建 A 并返回 。
对于此类情况,A 与关联的 implementation 是一个很好的组合。
例如,你可以路由到 Reactive Streams,如下例所示:RoutingSlipRouteStrategy
RoutingSlipRouteStrategy
MessageChannel
MessageChannel
MessageChannel
RoutingSlipRouteStrategy
FixedSubscriberChannel
MessageHandler
@Bean
public PollableChannel resultsChannel() {
return new QueueChannel();
}
@Bean
public RoutingSlipRouteStrategy routeStrategy() {
return (requestMessage, reply) -> requestMessage.getPayload() instanceof String
? new FixedSubscriberChannel(m ->
Mono.just((String) m.getPayload())
.map(String::toUpperCase)
.subscribe(v -> messagingTemplate().convertAndSend(resultsChannel(), v)))
: new FixedSubscriberChannel(m ->
Mono.just((Integer) m.getPayload())
.map(v -> v * 2)
.subscribe(v -> messagingTemplate().convertAndSend(resultsChannel(), v)));
}
Filter
邮件过滤器用于根据某些标准(如邮件报头值或邮件内容本身)来决定是应传递还是丢弃 。
因此,消息过滤器类似于路由器,不同之处在于,对于从过滤器的 input 通道接收的每条消息,同一条消息可能会也可能不会发送到过滤器的输出通道。
与路由器不同,它不决定将消息发送到哪个消息通道,而只决定是否发送消息。Message
正如我们在本节后面描述的那样,过滤器还支持 discard 通道。 在某些情况下,它可以根据布尔条件扮演非常简单的路由器(或“交换机”)的角色。 |
在 Spring 集成中,你可以将消息过滤器配置为委托给接口实现的消息端点。
该接口本身非常简单,如下面的清单所示:MessageSelector
public interface MessageSelector {
boolean accept(Message<?> message);
}
构造函数接受 selector 实例,如下例所示:MessageFilter
MessageFilter filter = new MessageFilter(someSelector);
结合命名空间和 SPEL,您可以使用非常少的 Java 代码配置强大的过滤器。
使用 XML 配置过滤器
您可以使用 element is used 创建消息选择端点。
除了 and 属性之外,它还需要一个 attribute。
can 指向一个实现,如下例所示:<filter>
input-channel
output-channel
ref
ref
MessageSelector
<int:filter input-channel="input" ref="selector" output-channel="output"/>
<bean id="selector" class="example.MessageSelectorImpl"/>
或者,您可以添加 attribute.
在这种情况下,该属性可以引用任何对象。
引用的方法可能需要入站消息的类型或有效负载类型。
该方法必须返回布尔值。
如果该方法返回 'true',则消息将发送到输出通道。
以下示例说明如何配置使用 attribute 的筛选条件:method
ref
Message
method
<int:filter input-channel="input" output-channel="output"
ref="exampleObject" method="someBooleanReturningMethod"/>
<bean id="exampleObject" class="example.SomeObject"/>
如果 selector 或改编的 POJO method 返回 ,则一些设置控制对被拒绝消息的处理。
默认情况下,(如果配置如上例所示)被拒绝的消息将被静默丢弃。
如果 rejection 应改为导致错误条件,请将该属性设置为 ,如下例所示:false
throw-exception-on-rejection
true
<int:filter input-channel="input" ref="selector"
output-channel="output" throw-exception-on-rejection="true"/>
如果您希望将被拒绝的消息路由到特定通道,请将该引用作为 ,如下例所示:discard-channel
<int:filter input-channel="input" ref="selector"
output-channel="output" discard-channel="rejectedMessages"/>
另请参阅 Advising Filters。
消息过滤器通常与发布-订阅通道结合使用。 许多筛选条件终端节点可能订阅了同一个通道,它们决定是否将消息传递给下一个终端节点,该终端节点可以是任何支持的类型(例如服务激活器)。 这提供了一种被动的替代方法,而不是使用具有单个点对点输入通道和多个输出通道的消息路由器的更主动的方法。 |
如果自定义筛选条件实施在其他定义中引用,我们建议使用属性。
但是,如果自定义过滤器实现的范围限定为单个元素,则应提供内部 Bean 定义,如下例所示:ref
<filter>
<filter>
<int:filter method="someMethod" input-channel="inChannel" output-channel="outChannel">
<beans:bean class="org.foo.MyCustomFilter"/>
</filter>
不允许在同一配置中同时使用 attribute 和内部处理程序定义,因为它会创建不明确的条件并引发异常。ref <filter> |
如果该属性引用了扩展的 Bean(例如框架本身提供的过滤器),则通过将输出通道直接注入过滤器 Bean 来优化配置。
在这种情况下,每个都必须是单独的 bean 实例(或-scoped bean)或使用内部配置类型。
但是,仅当未在筛选器 XML 定义中提供任何特定于筛选器的属性时,此优化才适用。
如果您无意中从多个 bean 引用了相同的消息处理程序,则会收到配置异常。ref MessageFilter ref prototype <bean/> |
随着 SPEL 支持的引入, Spring 集成将该属性添加到filter元素中。
它可以用于完全避免 Java 进行简单的过滤器,如下例所示:expression
<int:filter input-channel="input" expression="payload.equals('nonsense')"/>
作为 expression 属性的值传递的字符串将作为 SpEL 表达式进行评估,并在评估上下文中提供消息。
如果必须在应用程序上下文的范围内包含表达式的结果,则可以使用 SPEL 参考文档中定义的表示法,如下例所示:#{}
<int:filter input-channel="input"
expression="payload.matches(#{filterPatterns.nonsensePattern})"/>
如果表达式本身需要是动态的,则可以使用 'expression' 子元素。
这为通过 .
这是一个可以直接实现的策略接口,或者你可以依赖 Spring Integration 中提供的版本,该版本从“资源包”加载表达式,并可以在给定的秒数后检查修改。
以下配置示例演示了所有这些,其中,如果底层文件已被修改,则可以在一分钟内重新加载表达式:ExpressionSource
<int:filter input-channel="input" output-channel="output">
<int:expression key="filterPatterns.example" source="myExpressions"/>
</int:filter>
<beans:bean id="myExpressions"
class="o.s.i.expression.ReloadableResourceBundleExpressionSource">
<beans:property name="basename" value="config/integration/expressions"/>
<beans:property name="cacheSeconds" value="60"/>
</beans:bean>
如果 bean 名为 ,则无需在元素上提供 ' source' 属性。
但是,在前面的示例中,我们展示了它以保证完整性。ExpressionSource
expressionSource
<expression>
'config/integration/expressions.properties' 文件(或任何具有区域设置扩展名的更具体的版本,要以加载资源包的典型方式解析)可以包含键/值对,如下例所示:
filterPatterns.example=payload > 100
所有这些用作属性或子元素的示例也可以应用于 transformer、router、splitter、service-activator 和 header-enricher 元素中。
给定组件类型的语义和角色将影响评估结果的解释,就像解释方法调用的返回值一样。
例如,表达式可以返回 router 组件将视为消息通道名称的字符串。
但是,根据作为根对象的消息评估表达式并解析前缀为“@”的 bean 名称的底层功能在 Spring 集成中的所有核心 EIP 组件中是一致的。expression |
配置带有注释的过滤器
以下示例显示如何使用注释配置筛选条件:
public class PetFilter {
...
@Filter (1)
public boolean dogsOnly(String input) {
...
}
}
1 | 指示此方法将用作过滤器的注释。 如果要将此类用作过滤器,则必须指定它。 |
XML 元素提供的所有配置选项也可用于注释。@Filter
过滤器可以从 XML 中显式引用,或者,如果在类上定义了 Comments,则通过 Classpath 扫描自动检测。@MessageEndpoint
另请参阅使用注释通知终端节点。
分配器
拆分器是一个组件,其作用是将消息划分为多个部分,并发送生成的消息以进行独立处理。 很多时候,他们是包含聚合器的管道中的上游生产者。
编程模型
用于执行拆分的 API 由一个基类 .
它是一种封装拆分器通用功能的实现,例如在生成的消息上填写相应的消息标头 (, 和 )。
通过此填充,可以跟踪消息及其处理结果(在典型情况下,这些标头将复制到由各种转换终端节点生成的消息中)。
然后,这些值可以由组合消息处理器等使用。AbstractMessageSplitter
MessageHandler
CORRELATION_ID
SEQUENCE_SIZE
SEQUENCE_NUMBER
以下示例显示了以下摘录:AbstractMessageSplitter
public abstract class AbstractMessageSplitter
extends AbstractReplyProducingMessageConsumer {
...
protected abstract Object splitMessage(Message<?> message);
}
要在应用程序中实现特定的拆分器,可以扩展并实现该方法,其中包含用于拆分消息的逻辑。
返回值可以是以下值之一:AbstractMessageSplitter
splitMessage
-
A 或一个消息数组或迭代消息的 (或 )。 在这种情况下,消息将作为消息发送(在 之后 ,并填充)。 使用此方法可以为您提供更多控制 — 例如,在拆分过程中填充自定义消息标头。
Collection
Iterable
Iterator
CORRELATION_ID
SEQUENCE_SIZE
SEQUENCE_NUMBER
-
非消息对象的 A 或数组,或迭代非消息对象的 (或 )。 它的工作方式与前一种情况类似,不同之处在于每个 collection 元素都用作消息有效负载。 使用此方法,您可以专注于域对象,而不必考虑消息传送系统,并生成更易于测试的代码。
Collection
Iterable
Iterator
-
一个或非 message 对象(但不是集合或数组)。 它的工作方式与前面的情况类似,只是只发送了一条消息。
Message
在 Spring 集成中,任何 POJO 都可以实现拆分算法,前提是它定义了一个接受单个参数并具有返回值的方法。
在这种情况下,该方法的返回值将如前所述进行解释。
input 参数可以是 a 或简单的 POJO。
在后一种情况下,拆分器接收传入消息的有效负载。
我们推荐这种方法,因为它将代码与 Spring Integration API 分离,并且通常更容易测试。Message
迭代器
从版本 4.1 开始,支持 to split 的类型。
请注意,在 (或 ) 的情况下,我们无权访问基础项目的数量,并且 header 设置为 .
这意味着 default 的 an 将不起作用,并且不会释放 from 的组;它将保持为 。
在这种情况下,您应该使用适当的自定义或依赖 或 .AbstractMessageSplitter
Iterator
value
Iterator
Iterable
SEQUENCE_SIZE
0
SequenceSizeReleaseStrategy
<aggregator>
CORRELATION_ID
splitter
incomplete
ReleaseStrategy
send-partial-result-on-expiry
group-timeout
MessageGroupStoreReaper
从版本 5.0 开始,如果可能,它提供了允许确定 and 对象大小的方法。
例如,可以确定底层对象的大小。
从版本 5.0.9 开始,此方法还会正确返回 .AbstractMessageSplitter
protected obtainSizeIfPossible()
Iterable
Iterator
XPathMessageSplitter
NodeList
com.fasterxml.jackson.core.TreeNode
对象有助于避免在拆分之前在内存中构建整个集合。
例如,当使用迭代或流从某些外部系统(例如 DataBase 或 FTP )填充基础项时。Iterator
MGET
Stream 和 Flux
从版本 5.0 开始,支持 Java 和 Reactive Streams 类型进行拆分。
在这种情况下,目标是基于其迭代功能构建的。AbstractMessageSplitter
Stream
Publisher
value
Iterator
此外,如果分流器的输出通道是 的实例,则会产生一个结果而不是 ,并且输出通道订阅了此实例,以便对下游流量需求进行基于背压的拆分。ReactiveStreamsSubscribableChannel
AbstractMessageSplitter
Flux
Iterator
Flux
从版本 5.2 开始,splitter 支持发送 split 函数返回空容器(集合、数组、流等)的请求消息的选项。
在这种情况下,没有要迭代的项目来发送到 .
拆分结果仍作为流结束指示器。discardChannel
Flux
outputChannel
null
使用 XML 配置 Splitter
可以通过 XML 配置拆分器,如下所示:
<int:channel id="inputChannel"/>
<int:splitter id="splitter" (1)
ref="splitterBean" (2)
method="split" (3)
input-channel="inputChannel" (4)
output-channel="outputChannel" (5)
discard-channel="discardChannel" /> (6)
<int:channel id="outputChannel"/>
<beans:bean id="splitterBean" class="sample.PojoSplitter"/>
1 | 拆分器的 ID 是可选的。 |
2 | 对在应用程序上下文中定义的 Bean 的引用。
Bean 必须实现拆分逻辑,如前面的部分所述。
自选。
如果未提供对 bean 的引用,则假定到达 上的消息的有效负载是 的实现,并且默认的拆分逻辑应用于集合,将每个单独的元素合并到消息中并将其发送到 。input-channel java.util.Collection output-channel |
3 | 实现拆分逻辑的方法(在 Bean 上定义)。 自选。 |
4 | 分路器的 input 通道。 必填。 |
5 | 拆分器将拆分传入消息的结果发送到的通道。 可选(因为传入消息可以自己指定回复通道)。 |
6 | 在切分结果为空的情况下,请求消息发送到的频道。
可选(它们将在 result 的情况下停止)。null |
如果自定义 splitter 实现可以在其他定义中引用,我们建议使用属性。
但是,如果自定义拆分器处理程序实现的范围应限定为 的单个定义,则可以配置内部 Bean 定义,如下例所示:ref
<splitter>
<splitter>
<int:splitter id="testSplitter" input-channel="inChannel" method="split"
output-channel="outChannel">
<beans:bean class="org.foo.TestSplitter"/>
</int:splitter>
不允许在同一配置中同时使用 attribute 和 inner handler definition,因为它会产生不明确的条件并导致引发异常。ref <int:splitter> |
如果该属性引用了扩展的 bean(例如框架本身提供的拆分器),则通过将输出通道直接注入处理程序来优化配置。
在这种情况下,每个都必须是一个单独的 bean 实例(或-scoped bean)或使用内部配置类型。
但是,仅当未在拆分器 XML 定义中提供任何特定于拆分器的属性时,此优化才适用。
如果您无意中从多个 bean 引用了相同的消息处理程序,则会收到配置异常。ref AbstractMessageProducingHandler ref prototype <bean/> |
使用注释配置 Splitter
该注释适用于需要 type 或 message payload 类型的方法,并且该方法的返回值应为任何类型的 a。
如果返回的值不是实际对象,则每个项目都包装在 a 中作为 .
每个结果都发送到定义 的端点的指定 output 通道。@Splitter
Message
Collection
Message
Message
Message
Message
@Splitter
以下示例显示如何使用注释配置拆分器:@Splitter
@Splitter
List<LineItem> extractItems(Order order) {
return order.getItems()
}
另请参阅使用注释、拆分器和文件拆分器为终端节点提供建议。
聚合
基本上,聚合器是拆分器的镜像,是一种消息处理程序,它接收多条消息并将它们组合成一条消息。 事实上,聚合器通常是包含拆分器的管道中的下游使用者。
从技术上讲,聚合器比 splitter 更复杂,因为它是有状态的。
它必须保存要聚合的消息,并确定何时准备好聚合整个消息组。
为此,它需要一个 .MessageStore
功能性
Aggregator 通过关联和存储一组相关消息来组合这些消息,直到该组被视为完整为止。 此时,聚合器通过处理整个组来创建单个消息,并将聚合消息作为输出发送。
实现聚合器需要提供执行聚合(即,从多个中创建单个消息)的逻辑。 两个相关概念是 correlation 和 release。
Correlation 确定如何对消息进行分组以进行聚合。
在 Spring 集成中,默认情况下,关联是根据消息头完成的。
具有相同消息的消息将归为一组。
但是,您可以自定义关联策略,以允许以其他方式指定应如何将消息分组在一起。
为此,您可以实现 a (本章稍后将介绍)。IntegrationMessageHeaderAccessor.CORRELATION_ID
IntegrationMessageHeaderAccessor.CORRELATION_ID
CorrelationStrategy
要确定一组消息的准备处理点,请查阅 a。
当序列中包含的所有消息都存在时,聚合器的默认发布策略会根据标头释放一个组。
您可以通过提供对自定义实施的引用来覆盖此默认策略。ReleaseStrategy
IntegrationMessageHeaderAccessor.SEQUENCE_SIZE
ReleaseStrategy
编程模型
聚合 API 由许多类组成:
-
interface 及其子类: 和
MessageGroupProcessor
MethodInvokingAggregatingMessageGroupProcessor
ExpressionEvaluatingMessageGroupProcessor
-
接口及其默认实现:
ReleaseStrategy
SimpleSequenceSizeReleaseStrategy
-
接口及其默认实现:
CorrelationStrategy
HeaderAttributeCorrelationStrategy
AggregatingMessageHandler
( 的子类 )是一种实现,封装了聚合器(和其他相关用例)的常见功能,如下所示:AggregatingMessageHandler
AbstractCorrelatingMessageHandler
MessageHandler
-
将消息关联到要聚合的组中
-
将这些消息保留在 中,直到可以释放组
MessageStore
-
决定何时可以发布组
-
将已发布的组聚合到一条消息中
-
识别和响应过期的组
决定如何将消息组合在一起的责任委派给实例。
决定是否可以释放消息组的责任委托给实例。CorrelationStrategy
ReleaseStrategy
下面的清单显示了 base 的简要亮点(实现该方法的责任留给了开发人员):AbstractAggregatingMessageGroupProcessor
aggregatePayloads
public abstract class AbstractAggregatingMessageGroupProcessor
implements MessageGroupProcessor {
protected Map<String, Object> aggregateHeaders(MessageGroup group) {
// default implementation exists
}
protected abstract Object aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders);
}
请参阅 和 作为 .DefaultAggregatingMessageGroupProcessor
ExpressionEvaluatingMessageGroupProcessor
MethodInvokingMessageGroupProcessor
AbstractAggregatingMessageGroupProcessor
从版本 5.2 开始,可以使用一种策略来合并和计算(聚合)输出消息的标头。
该实现可用于返回组之间没有冲突的所有 Headers 的逻辑;组中的一封或多封邮件的报头缺失不被视为冲突。
冲突的标头将被省略。
与新引入的 一起,此函数用于任何任意(非)实现。
从本质上讲,框架将提供的函数注入到实例中,并将所有其他实现包装到 .
和 the 之间的逻辑差异是,后者在调用委托策略之前不会提前计算标头,如果委托返回 或 ,则不会调用函数。
在这种情况下,框架假定目标实现已负责生成一组填充到返回结果中的适当标头。
该策略可用作 XML 配置的 reference 属性、Java DSL 的选项以及普通 Java 配置的选项。Function<MessageGroup, Map<String, Object>>
AbstractAggregatingMessageGroupProcessor
DefaultAggregateHeadersFunction
DelegatingMessageGroupProcessor
AbstractAggregatingMessageGroupProcessor
MessageGroupProcessor
AbstractAggregatingMessageGroupProcessor
DelegatingMessageGroupProcessor
AbstractAggregatingMessageGroupProcessor
DelegatingMessageGroupProcessor
Message
AbstractIntegrationMessageBuilder
Function<MessageGroup, Map<String, Object>>
headers-function
AggregatorSpec.headersFunction()
AggregatorFactoryBean.setHeadersFunction()
该 由 拥有,并且具有基于消息标头的默认值,如下例所示:CorrelationStrategy
AbstractCorrelatingMessageHandler
IntegrationMessageHeaderAccessor.CORRELATION_ID
public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store,
CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) {
...
this.correlationStrategy = correlationStrategy == null ?
new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID) : correlationStrategy;
this.releaseStrategy = releaseStrategy == null ? new SimpleSequenceSizeReleaseStrategy() : releaseStrategy;
...
}
至于消息组的实际处理,默认实现是 .
它会创建一个 single,其 payload 是给定组收到的 payload 之一。
这适用于使用拆分器、发布-订阅通道或上游收件人列表路由器的简单分散-收集实现。DefaultAggregatingMessageGroupProcessor
Message
List
在此类场景中使用 publish-subscribe 通道或收件人列表路由器时,请务必启用该标志。
这样做会添加必要的标头:、 、 和 .
默认情况下,对于 Spring 集成中的拆分器,该行为是启用的,但不会为发布-订阅通道或收件人列表路由器启用该行为,因为这些组件可能用于不需要这些 Headers 的各种上下文中。apply-sequence CORRELATION_ID SEQUENCE_NUMBER SEQUENCE_SIZE |
在为应用程序实施特定的聚合器策略时,您可以扩展和实施该方法。
但是,有更好的解决方案,与 API 的耦合较少,用于实现聚合逻辑,可以通过 XML 或 Comments 进行配置。AbstractAggregatingMessageGroupProcessor
aggregatePayloads
通常,任何 POJO 都可以实现聚合算法,前提是它提供了一个接受 single 作为参数的方法(也支持参数化列表)。
调用该方法用于聚合消息,如下所示:java.util.List
-
如果参数是 a 且参数类型 T is assignable to ,则为聚合而累积的消息的整个列表将发送到聚合器。
java.util.Collection<T>
Message
-
如果参数是非参数化的或参数类型不可分配给 ,则该方法将接收累积消息的有效负载。
java.util.Collection
Message
-
如果返回类型不可分配给 ,则会将其视为框架自动创建的 a 的有效负载。
Message
Message
为了实现代码简单性并促进最佳实践(如低耦合、可测试性等),实现聚合逻辑的首选方法是通过 POJO 并使用 XML 或注释支持在应用程序中对其进行配置。 |
从版本 5.3 开始,在处理消息组后,an 会为具有多个嵌套级别的适当 splitter-aggregator 场景执行消息头修改。
仅当消息组发布结果不是消息集合时,才会执行该操作。
在这种情况下,目标负责在构建这些消息时进行调用。AbstractCorrelatingMessageHandler
MessageBuilder.popSequenceDetails()
MessageGroupProcessor
MessageBuilder.popSequenceDetails()
如果 返回 a ,则仅当输出消息与组中的第一条消息匹配时,才会对输出消息执行 a 。
(以前,仅当 plain payload 或 an 已从 返回时,才会执行此操作。MessageGroupProcessor
Message
MessageBuilder.popSequenceDetails()
sequenceDetails
AbstractIntegrationMessageBuilder
MessageGroupProcessor
此功能可由新属性控制,因此,当标准拆分器尚未填充关联详细信息时,可以禁用 。
此属性实质上撤消了 .
有关更多信息,请参阅 Splitter 。popSequence
boolean
MessageBuilder.popSequenceDetails()
applySequence = true
AbstractMessageSplitter
该方法返回一个 .
因此,如果聚合 POJO 方法具有参数,则传入的参数正是该实例,并且当您对聚合器使用 a 时,将在释放组后清除该原始对象。
因此,如果 POJO 中的变量从聚合器中传出,它也会被清除。
如果您只想按原样发布该集合以进行进一步处理,则必须构建一个新的(例如, )。
从版本 4.3 开始,框架不再将消息复制到新集合,以避免创建不需要的额外对象。SimpleMessageGroup.getMessages() unmodifiableCollection Collection<Message> Collection SimpleMessageStore Collection<Message> Collection<Message> Collection new ArrayList<Message>(messages) |
在版本 4.2 之前,无法使用 XML 配置提供 。
只有 POJO 方法可用于聚合。
现在,如果框架检测到引用的(或内部的)bean implements ,则将其用作聚合器的输出处理器。MessageGroupProcessor
MessageProcessor
如果您希望从自定义中释放对象集合作为消息的有效负载,则您的类应扩展并实现 .MessageGroupProcessor
AbstractAggregatingMessageGroupProcessor
aggregatePayloads()
此外,从版本 4.2 开始,提供了 a。
它返回来自组的消息集合,如前所述,这会导致单独发送已发布的消息。SimpleMessageGroupProcessor
这使得聚合器可以充当消息屏障,其中到达的消息将被保留,直到发布策略触发并且该组作为单个消息序列发布。
从版本 6.0 开始,上述拆分行为仅在组处理器为 .
否则,对于返回 , 则仅发出一条回复消息,并将整个消息集合作为其有效负载。
这种逻辑由聚合器的规范目的决定 - 通过某个键收集请求消息并生成单个分组消息。SimpleMessageGroupProcessor
MessageGroupProcessor
Collection<Message>
ReleaseStrategy
接口定义如下:ReleaseStrategy
public interface ReleaseStrategy {
boolean canRelease(MessageGroup group);
}
通常,如果任何 POJO 提供接受 single 作为参数的方法(也支持参数化列表)并返回布尔值,则任何 POJO 都可以实现完成决策逻辑。
在每条新消息到达后调用此方法,以确定组是否完成,如下所示:java.util.List
-
如果参数是 a 且参数类型是 assignable to ,则组中累积的整个消息列表将发送到该方法。
java.util.List<T>
T
Message
-
如果参数是非参数化的或参数类型不是 assign to ,则该方法接收累积消息的有效负载。
java.util.List
Message
-
如果消息组已准备好进行聚合,则必须返回该方法,否则返回 false。
true
以下示例显示如何对 type 为 a 的注释 :@ReleaseStrategy
List
Message
public class MyReleaseStrategy {
@ReleaseStrategy
public boolean canMessagesBeReleased(List<Message<?>>) {...}
}
以下示例显示如何对 type 为 a 的注释 :@ReleaseStrategy
List
String
public class MyReleaseStrategy {
@ReleaseStrategy
public boolean canMessagesBeReleased(List<String>) {...}
}
根据前两个示例中的签名,基于 POJO 的发布策略将传递尚未发布的消息(如果需要访问整个)或 payload 对象(如果 type 参数不是 )。
这满足了大多数使用案例。
但是,如果出于某种原因,您需要访问 full ,则应提供接口的实现。Collection
Message
Collection
Message
MessageGroup
ReleaseStrategy
在处理可能较大的组时,您应该了解如何调用这些方法,因为在释放组之前可能会多次调用发布策略。
最有效的是 的实现,因为聚合器可以直接调用它。
第二高效的是具有参数类型的 POJO 方法。
效率最低的是具有类型的 POJO 方法。
每次调用发布策略时,框架都必须将有效负载从组中的消息复制到新集合中(并可能尝试将有效负载转换为 )。
使用可避免转换,但仍需要创建新的 . 出于这些原因,对于大型组,我们建议您实施 . |
当释放组进行聚合时,将处理其所有尚未发布的消息并将其从组中删除。
如果组也已完成(即,如果序列中的所有消息都已到达或未定义序列),则该组将标记为完成。
此组的任何新消息都将发送到 discard 通道(如果已定义)。
设置为 (默认为 ) 将删除整个组,并且任何新消息(与已删除的组具有相同的关联 ID)将形成一个新组。
您可以通过使用 a together 并将其设置为 来释放部分序列。expire-groups-upon-completion
true
false
MessageGroupStoreReaper
send-partial-result-on-expiry
true
为了便于丢弃延迟到达的消息,聚合器必须在组发布后维护有关该组的状态。
这最终可能导致内存不足情况。
为避免此类情况,应考虑配置 a 以删除组元数据。
应将过期参数设置为在到达某个点后使组过期,之后预计延迟消息不会到达。
有关配置收割者的信息,请参阅管理聚合器中的状态:MessageGroupStore 。MessageGroupStoreReaper |
Spring 集成提供了 : 的实现。
此实现会查阅每条到达消息的 and 标头,以确定消息组何时完成并准备好进行聚合。
如前所述,它也是默认策略。ReleaseStrategy
SimpleSequenceSizeReleaseStrategy
SEQUENCE_NUMBER
SEQUENCE_SIZE
在版本 5.0 之前,默认发布策略为 ,该策略在大型组中表现不佳。
使用该策略,可以检测并拒绝重复的序列号。
此操作可能很昂贵。SequenceSizeReleaseStrategy |
如果要聚合大型组,则无需释放部分组,也不需要检测/拒绝重复序列,请考虑改用 - 对于这些用例,它的效率要高得多,并且自版本 5.0 以来,当未指定部分组释放时,它是默认设置。SimpleSequenceSizeReleaseStrategy
聚合大型组
4.3 版本将 a 中消息的默认值更改为(以前是 )。
当从大型组中删除单个消息时,这很昂贵(需要 O(n) 线性扫描)。
尽管删除哈希集的速度通常要快得多,但对于大型消息来说,它的成本可能很高,因为必须在 insert 和 remove 上计算哈希值。
如果您的消息哈希成本很高,请考虑使用其他一些集合类型。
如使用MessageGroupFactory
中所述,提供了a,以便您可以选择最适合您需求的。
您还可以提供自己的工厂实现来创建其他一些 .Collection
SimpleMessageGroup
HashSet
BlockingQueue
SimpleMessageGroupFactory
Collection
Collection<Message<?>>
以下示例显示如何使用前面的实现和 :SimpleSequenceSizeReleaseStrategy
<int:aggregator input-channel="aggregate"
output-channel="out" message-store="store" release-strategy="releaser" />
<bean id="store" class="org.springframework.integration.store.SimpleMessageStore">
<property name="messageGroupFactory">
<bean class="org.springframework.integration.store.SimpleMessageGroupFactory">
<constructor-arg value="BLOCKING_QUEUE"/>
</bean>
</property>
</bean>
<bean id="releaser" class="SimpleSequenceSizeReleaseStrategy" />
如果 filter 端点涉及聚合器上游的流,则 sequence size release strategy(固定的或基于标头的)将无法实现其目的,因为 sequence 中的某些消息可能会被 filter 丢弃。
在这种情况下,建议选择另一个 ,或使用从丢弃子流发送的补偿消息,该子流的内容中携带一些信息,以便在自定义完整组函数中跳过。
有关更多信息,请参阅过滤器。sequenceSize ReleaseStrategy |
关联策略
接口定义如下:CorrelationStrategy
public interface CorrelationStrategy {
Object getCorrelationKey(Message<?> message);
}
该方法返回一个,该键表示用于将消息与消息组关联的相关键。
该键必须满足 中用于键的条件,以便实现 和 。Object
Map
equals()
hashCode()
通常,任何 POJO 都可以实现关联逻辑,并且将消息映射到方法的参数(或多个参数)的规则与 a 相同(包括对 Comments 的支持)。
该方法必须返回一个值,并且该值不能为 。ServiceActivator
@Header
null
Spring 集成提供了 : 的实现。
此实现返回其中一个消息标头(其名称由 constructor 参数指定)的值作为相关键。
默认情况下,关联策略是返回 header 属性值的 a。
如果您有要用于关联的自定义标头名称,则可以在 的实例上配置该名称,并将其作为聚合器关联策略的参考提供。CorrelationStrategy
HeaderAttributeCorrelationStrategy
HeaderAttributeCorrelationStrategy
CORRELATION_ID
HeaderAttributeCorrelationStrategy
锁定注册表
对组的更改是线程安全的。
因此,当您同时发送同一相关 ID 的消息时,聚合器中将只处理其中一个消息,从而使其实际上每个消息组都是单线程的。
A 用于获取已解析相关 ID 的锁。
默认情况下使用 A (in-memory)。
要在使用共享的服务器之间同步更新,您必须配置共享锁注册表。LockRegistry
DefaultLockRegistry
MessageGroupStore
避免死锁
如上所述,当消息组发生更改(添加或释放消息)时,将持有一个锁。
请考虑以程:
...->aggregator1-> ... ->aggregator2-> ...
如果有多个线程,并且聚合器共享一个公共锁注册表,则可能会出现死锁。
这将导致线程挂起,并可能显示如下结果:jstack <pid>
Found one Java-level deadlock:
=============================
"t2":
waiting for ownable synchronizer 0x000000076c1cbfa0, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
which is held by "t1"
"t1":
waiting for ownable synchronizer 0x000000076c1ccc00, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
which is held by "t2"
有几种方法可以避免此问题:
-
确保每个聚合器都有自己的 Lock Registry(这可以是跨应用程序实例的共享注册表,但流中的两个或多个聚合器必须各自具有不同的注册表)
-
使用 OR 作为聚合器的输出通道,以便下游流在新线程上运行
ExecutorChannel
QueueChannel
-
从版本 5.1.1 开始,将 Aggregator 属性设置为
releaseLockBeforeSend
true
如果由于某种原因,单个聚合器的输出最终路由回同一聚合器,也可能导致此问题。 当然,上述第一种解决方案不适用于这种情况。 |
在 Java DSL 中配置聚合器
有关如何在 Java DSL 中配置聚合器的信息,请参见 Aggregators 和 Resequencers。
使用 XML 配置聚合器
Spring 集成支持通过元素使用 XML 配置聚合器。
以下示例显示了聚合器的示例:<aggregator/>
<channel id="inputChannel"/>
<int:aggregator id="myAggregator" (1)
auto-startup="true" (2)
input-channel="inputChannel" (3)
output-channel="outputChannel" (4)
discard-channel="throwAwayChannel" (5)
message-store="persistentMessageStore" (6)
order="1" (7)
send-partial-result-on-expiry="false" (8)
send-timeout="1000" (9)
correlation-strategy="correlationStrategyBean" (10)
correlation-strategy-method="correlate" (11)
correlation-strategy-expression="headers['foo']" (12)
ref="aggregatorBean" (13)
method="aggregate" (14)
release-strategy="releaseStrategyBean" (15)
release-strategy-method="release" (16)
release-strategy-expression="size() == 5" (17)
expire-groups-upon-completion="false" (18)
empty-group-min-timeout="60000" (19)
lock-registry="lockRegistry" (20)
group-timeout="60000" (21)
group-timeout-expression="size() ge 2 ? 100 : -1" (22)
expire-groups-upon-timeout="true" (23)
scheduler="taskScheduler" > (24)
<expire-transactional/> (25)
<expire-advice-chain/> (26)
</aggregator>
<int:channel id="outputChannel"/>
<int:channel id="throwAwayChannel"/>
<bean id="persistentMessageStore" class="org.springframework.integration.jdbc.store.JdbcMessageStore">
<constructor-arg ref="dataSource"/>
</bean>
<bean id="aggregatorBean" class="sample.PojoAggregator"/>
<bean id="releaseStrategyBean" class="sample.PojoReleaseStrategy"/>
<bean id="correlationStrategyBean" class="sample.PojoCorrelationStrategy"/>
1 | 聚合商的 ID 是可选的。 |
2 | 生命周期属性指示是否应在应用程序上下文启动期间启动聚合器。 可选(默认值为 'true')。 |
3 | 聚合器从中接收消息的通道。 必填。 |
4 | 聚合器将聚合结果发送到的通道。 可选(因为传入消息本身可以在 'replyChannel' 消息头中指定回复通道)。 |
5 | 聚合器将超时消息发送到的通道 (if is )。
自选。send-partial-result-on-expiry false |
6 | 对 a 的引用,用于将消息组存储在其关联键下,直到它们完成。
自选。
默认情况下,它是一个易失性内存存储。
有关更多信息,请参阅 Message Store 。MessageGroupStore |
7 | 当多个句柄订阅到同一 (用于负载平衡目的) 时,此聚合器的顺序。
自选。DirectChannel |
8 | 表示过期的消息应该被聚合,并在其包含过期后发送到 'output-channel' 或 'replyChannel' (参见 MessageGroupStore.expireMessageGroups(long) )。
使 a 过期的一种方法是配置 .
但是,您也可以通过调用 来过期。
您可以通过 Control Bus 操作来实现此目的,或者,如果您有对实例的引用,则通过调用 .
否则,此属性本身不会执行任何操作。
它仅用作指示是否丢弃或将仍在 中即将过期的任何消息发送到 output 或 reply 通道。
可选(默认值为 )。
注意:此属性可能更合适地称为 ,因为如果设置为 ,则组实际上可能不会过期。MessageGroup MessageGroup MessageGroupStoreReaper MessageGroup MessageGroupStore.expireMessageGroups(timeout) MessageGroupStore expireMessageGroups(timeout) MessageGroup false send-partial-result-on-timeout expire-groups-upon-timeout false |
9 | 向 或 发送回复时要等待的超时间隔。
默认为 ,这会导致无限期阻塞。
仅当 output 通道具有一些 'sending' 限制(例如具有固定 'capacity' 时),它才适用。
在这种情况下,会引发 a。
对于实现, 则 被忽略 。
对于 ,从计划过期任务开始将导致重新计划此任务。
自选。Message output-channel discard-channel -1 QueueChannel MessageDeliveryException AbstractSubscribableChannel send-timeout group-timeout(-expression) MessageDeliveryException |
10 | 对实现消息关联(分组)算法的 Bean 的引用。
bean 可以是接口的实现或 POJO。
在后一种情况下,还必须定义 attribute。
可选(默认情况下,聚合器使用 header)。CorrelationStrategy correlation-strategy-method IntegrationMessageHeaderAccessor.CORRELATION_ID |
11 | 在 Bean 上定义的方法,由 引用。
它实现了关联决策算法。
可选,有限制 ( must be present)。correlation-strategy correlation-strategy |
12 | 表示关联策略的 SPEL 表达式。
例:。
只允许使用其中之一 or。"headers['something']" correlation-strategy correlation-strategy-expression |
13 | 对在应用程序上下文中定义的 Bean 的引用。 如前所述,Bean 必须实现聚合逻辑。 可选(默认情况下,聚合消息列表将成为输出消息的有效负载)。 |
14 | 在 Bean 上定义的方法,该属性引用该属性。
它实现了消息聚合算法。
可选 (取决于定义的属性)。ref ref |
15 | 对实现发布策略的 bean 的引用。
bean 可以是接口的实现或 POJO。
在后一种情况下,还必须定义 attribute。
可选(默认情况下,聚合商使用 header 属性)。ReleaseStrategy release-strategy-method IntegrationMessageHeaderAccessor.SEQUENCE_SIZE |
16 | 在 Bean 上定义的方法,该属性引用该属性。
它实现了完成决策算法。
可选,有限制 ( must be present)。release-strategy release-strategy |
17 | 表示发布策略的 SPEL 表达式。
表达式的根对象是 .
例:。
只允许使用其中之一 or。MessageGroup "size() == 5" release-strategy release-strategy-expression |
18 | 当设置为 (default is )时,已完成的组将从邮件存储中删除,从而让具有相同关联的后续邮件形成一个新组。
默认行为是将具有与已完成组相同的关联的消息发送到 .true false discard-channel |
19 | 仅当为 的 配置了 时适用。
默认情况下,当 配置为使部分组过期时,也会删除空组。
在正常释放组后,存在空组。
空组允许检测和丢弃延迟到达的消息。
如果您希望使空组过期的时间比使部分组过期的时间更长,请设置此属性。
然后,空组不会从 中删除,直到它们至少在此毫秒数内未被修改。
请注意,空 group 过期的实际时间也受 reaper 属性的影响,它可能与此值加上 timeout 一样多。MessageGroupStoreReaper MessageStore <aggregator> MessageGroupStoreReaper MessageStore timeout |
20 | 对 Bean 的引用。
它过去用于获取基于 的 for 并发操作。
默认情况下,使用 internal 。
使用 distributed ,例如 ,可确保聚合器只有一个实例可以同时对组进行操作。
有关更多信息,请参阅 Redis Lock Registry 或 Zookeeper Lock Registry。org.springframework.integration.util.LockRegistry Lock groupId MessageGroup DefaultLockRegistry LockRegistry ZookeeperLockRegistry |
21 | 超时(以毫秒为单位),用于在当前消息到达时不释放组时强制完成。
当需要发出部分结果(或丢弃组)时,如果新消息在超时(从最后一条消息到达的时间开始计算)内未到达,则此属性为聚合器提供内置的基于时间的发布策略。
要设置从创建时间开始计算的超时,请参阅 information。
当新消息到达聚合器时,将取消其任何现有消息。
如果返回 (表示不发布) 和 ,则计划新任务使组过期。
我们不建议将此属性设置为零(或负值)。
这样做可以有效地禁用聚合器,因为每个消息组都会立即完成。
但是,您可以使用表达式有条件地将其设置为零(或负值)。
请参阅 以获取信息。
完成期间执行的操作取决于 和 属性。
有关更多信息,请参阅 Aggregator 和 Group Timeout 。
它与 'group-timeout-expression' 属性互斥。MessageGroup ReleaseStrategy MessageGroup MessageGroup group-timeout-expression ScheduledFuture<?> MessageGroup ReleaseStrategy false groupTimeout > 0 group-timeout-expression ReleaseStrategy send-partial-group-on-expiry |
22 | 计算结果为 a 的 SPEL 表达式,其中 作为计算上下文对象。
用于安排 强制完成。
如果表达式的计算结果为 ,则不会计划完成。
如果计算结果为零,则立即在当前线程上完成该组。
实际上,这提供了一个动态属性。
例如,如果您希望在自创建组以来经过 10 秒后强制完成 a,则可以考虑使用以下 SpEL 表达式:其中由 as the here 是评估上下文对象。
但请记住,组创建时间可能与第一条消息的时间不同,具体取决于其他组过期属性的配置。
有关更多信息,请参阅。
与 'group-timeout' 属性互斥。groupTimeout MessageGroup #root MessageGroup null group-timeout MessageGroup timestamp + 10000 - T(System).currentTimeMillis() timestamp MessageGroup.getTimestamp() MessageGroup #root group-timeout |
23 | 当组由于超时(或 a )而完成时,默认情况下,该组将过期(完全删除)。
迟到的消息将启动一个新组。
您可以将此项设置为完成组,但保留其元数据,以便丢弃延迟到达的消息。
空组可以稍后使用 a 和 属性过期。
它默认为 'true'。MessageGroupStoreReaper false MessageGroupStoreReaper empty-group-min-timeout |
24 | 一个 bean 引用,用于安排 在 中没有新消息到达时强制完成 。
如果未提供,则使用在 () 中注册的默认调度程序 () 。
如果指定了 或 未指定,则此属性不适用。TaskScheduler MessageGroup MessageGroup groupTimeout taskScheduler ApplicationContext ThreadPoolTaskScheduler group-timeout group-timeout-expression |
25 | 从 4.1 版本开始。
它允许为操作启动事务。
它由 a 或 发起,不适用于正常的 、 和 操作。
仅允许此子元素 or。forceComplete group-timeout(-expression) MessageGroupStoreReaper add release discard <expire-advice-chain/> |
26 | 从 4.1 版本开始。
它允许为操作配置 any。
它由 a 或 发起,不适用于正常的 、 和 操作。
仅允许此子元素 or。
还可以使用 Spring 命名空间在此处配置事务。Advice forceComplete group-timeout(-expression) MessageGroupStoreReaper add release discard <expire-transactional/> Advice tx |
过期组
有两个属性与过期(完全删除)组相关。
当组过期时,没有该组的记录,如果到达具有相同关联的新消息,则会启动一个新组。
当组完成(无过期)时,空组将保留,并丢弃延迟到达的消息。
稍后可以通过将 a 与属性结合使用来删除空组。
如果组未正常完成,但因超时而被释放或丢弃,则组通常已过期。
从版本 4.1 开始,您可以使用 .
它默认为向后兼容。
从版本 5.0 开始,空组也计划在 之后删除。
如果为 和 ,则在正常或部分序列发布发生时计划删除组的任务。 从版本 5.4 开始,可以将聚合器(和重新排序器)配置为使孤立组(持久性消息存储中的组)过期,否则可能不会发布)。
(如果大于 )表示应清除存储中早于此值的组。
该方法在启动时调用,并与提供的 一起在计划任务中定期调用。
此方法也可以随时从外部调用。
根据上述提供的过期选项,过期逻辑完全委托给功能。
当需要从那些不会再使用常规消息到达逻辑释放的旧组中清除消息存储时,这种定期清除功能非常有用。
在大多数情况下,这发生在应用程序重新启动后,当使用持久性消息组存储时。
该功能类似于 Scheduled Task,但在使用 group timeout 而不是 reaper 时,提供了一种处理特定组件中的旧组的便捷方法。
必须专门为当前关联终端节点提供。
否则,一个聚合商可能会从另一个聚合商中清除组。
使用聚合器时,使用此技术过期的组将被丢弃或作为部分组发布,具体取决于属性。 |
如果自定义聚合器处理程序实现可能在其他定义中引用,我们通常建议使用属性。
但是,如果自定义聚合器实现仅由单个定义使用,则可以使用内部 Bean 定义(从版本 1.0.3 开始)在元素中配置聚合 POJO,如下例所示:ref
<aggregator>
<aggregator>
<aggregator>
<aggregator input-channel="input" method="sum" output-channel="output">
<beans:bean class="org.foo.PojoAggregator"/>
</aggregator>
不允许在同一配置中同时使用属性和内部 Bean 定义,因为它会产生不明确的条件。
在这种情况下,将引发 Exception。ref <aggregator> |
以下示例显示了聚合器 Bean 的实现:
public class PojoAggregator {
public Long add(List<Long> results) {
long total = 0l;
for (long partialResult: results) {
total += partialResult;
}
return total;
}
}
前面示例的完成策略 Bean 的实现可能如下所示:
public class PojoReleaseStrategy {
...
public boolean canRelease(List<Long> numbers) {
int sum = 0;
for (long number: numbers) {
sum += number;
}
return sum >= maxValue;
}
}
只要有必要这样做,就可以将发布策略方法和 aggregator 方法组合成一个 bean。 |
上面示例的相关策略 bean 的实现可能如下所示:
public class PojoCorrelationStrategy {
...
public Long groupNumbersByLastDigit(Long number) {
return number % 10;
}
}
前面示例中的聚合器将按某个标准对数字进行分组(在本例中为除以 10 后的余数)并保留该组,直到有效负载提供的数字之和超过特定值。
只要有必要,就可以将发布策略方法、相关策略方法和 聚合器方法组合到单个 bean 中。 (实际上,它们全部或其中任何两个都可以组合。 |
聚合器和 Spring 表达式语言 (SpEL)
从 Spring Integration 2.0 开始,你可以使用 SPEL 处理各种策略(关联、发布和聚合),如果这种发布策略背后的逻辑相对简单,我们建议这样做。
假设您有一个 legacy 组件,该组件旨在接收对象数组。
我们知道,默认发布策略会将所有聚合消息组合到 .
现在我们有两个问题。
首先,我们需要从列表中提取单个消息。
其次,我们需要提取每条消息的有效负载并组装对象数组。
以下示例解决了这两个问题:List
public String[] processRelease(List<Message<String>> messages){
List<String> stringList = new ArrayList<String>();
for (Message<String> message : messages) {
stringList.add(message.getPayload());
}
return stringList.toArray(new String[]{});
}
但是,使用 SPEL,实际上可以通过单行表达式相对容易地处理此类需求,从而避免编写自定义类并将其配置为 bean。 以下示例显示了如何执行此操作:
<int:aggregator input-channel="aggChannel"
output-channel="replyChannel"
expression="#this.![payload].toArray()"/>
在前面的配置中,我们使用集合投影表达式从列表中所有消息的有效负载中组装一个新的集合,然后将其转换为数组,从而获得与早期 Java 代码相同的结果。
在处理自定义发布和关联策略时,您可以应用相同的基于表达式的方法。
您可以将简单的关联逻辑实现为 SPEL 表达式并在属性中对其进行配置,而不是在属性中为自定义定义 Bean,如下例所示:CorrelationStrategy
correlation-strategy
correlation-strategy-expression
correlation-strategy-expression="payload.person.id"
在前面的示例中,我们假设有效负载具有一个带有 的属性,该属性将用于关联消息。person
id
同样,对于 ,您可以将发布逻辑实现为 SpEL 表达式,并在属性中对其进行配置。
evaluation context 的根对象是 itself.
可以通过在表达式中使用 group 的属性来引用 of messages。ReleaseStrategy
release-strategy-expression
MessageGroup
List
message
在版本 5.0 之前的版本中,根对象是 的集合,如前面的示例所示:Message<?> |
release-strategy-expression="!messages.?[payload==5].empty"
在前面的示例中,SPEL 评估上下文的根对象是自身,并且您表示,只要此组中有有效负载为 的消息,就应该释放该组。MessageGroup
5
聚合器和组超时
从版本 4.0 开始,引入了两个新的互斥属性:和 .
请参阅使用 XML 配置聚合器。
在某些情况下,如果当前消息到达时没有释放,则可能需要在超时后发出聚合器结果(或丢弃该组)。
为此,该选项允许安排 be forced to complete,如下例所示:group-timeout
group-timeout-expression
ReleaseStrategy
groupTimeout
MessageGroup
<aggregator input-channel="input" output-channel="output"
send-partial-result-on-expiry="true"
group-timeout-expression="size() ge 2 ? 10000 : -1"
release-strategy-expression="messages[0].headers.sequenceNumber == messages[0].headers.sequenceSize"/>
在此示例中,如果聚合器按顺序接收最后一条消息,则可以正常发布。
如果该特定消息未到达,则强制该组在 10 秒后完成,只要该组至少包含两个 Message。release-strategy-expression
groupTimeout
强制组完成的结果取决于 和 。
首先,再次咨询发布策略,看看是否要进行正常的发布。
虽然组尚未更改,但此时可以决定释放组。
如果发布策略仍未释放该组,则表示该组已过期。
如果为 ,则(部分)中的现有消息将作为普通聚合器回复消息发布到 。
否则,它将被丢弃。ReleaseStrategy
send-partial-result-on-expiry
ReleaseStrategy
send-partial-result-on-expiry
true
MessageGroup
output-channel
行为 和 之间存在差异(请参阅 使用 XML 配置 Aggregator)。
reaper 定期启动对所有 s 的强制完成。
如果在 期间没有新消息到达,则 会单独对每个消息执行此操作。
此外,收割者可用于删除空组(如果为 false,则保留空组以丢弃延迟的消息)。groupTimeout
MessageGroupStoreReaper
MessageGroup
MessageGroupStore
groupTimeout
MessageGroup
groupTimeout
expire-groups-upon-completion
从版本 5.5 开始,可以评估为实例。
这在以下情况下非常有用,例如根据组创建时间 () 而不是当前消息到达来确定计划任务时刻,因为它是在计算结果时计算的:groupTimeoutExpression
java.util.Date
MessageGroup.getTimestamp()
groupTimeoutExpression
long
group-timeout-expression="size() ge 2 ? new java.util.Date(timestamp + 200) : null"
使用注释配置聚合器
以下示例显示了配置了注释的聚合器:
public class Waiter {
...
@Aggregator (1)
public Delivery aggregatingMethod(List<OrderItem> items) {
...
}
@ReleaseStrategy (2)
public boolean releaseChecker(List<Message<?>> messages) {
...
}
@CorrelationStrategy (3)
public String correlateBy(OrderItem item) {
...
}
}
1 | 一个注释,指示此方法应用作聚合器。 如果将此类用作聚合器,则必须指定它。 |
2 | 一个注释,指示此方法用作聚合器的发布策略。
如果任何方法中都不存在,则聚合器将使用 .SimpleSequenceSizeReleaseStrategy |
3 | 一个注释,指示此方法应用作聚合器的关联策略。
如果未指示关联策略,则聚合器将使用基于 .HeaderAttributeCorrelationStrategy CORRELATION_ID |
XML 元素提供的所有配置选项也可用于注释。@Aggregator
聚合器可以从 XML 中显式引用,或者如果在类上定义了 ,则通过 Classpath 扫描自动检测。@MessageEndpoint
Aggregator 组件的 Comments 配置(和其他配置)仅涵盖简单的用例,其中大多数默认选项就足够了。
如果在使用 Comments 配置时需要对这些选项进行更多控制,请考虑使用 the 的定义,并用 标记其方法,如下例所示:@Aggregator
@Bean
AggregatingMessageHandler
@Bean
@ServiceActivator
@ServiceActivator(inputChannel = "aggregatorChannel")
@Bean
public MessageHandler aggregator(MessageGroupStore jdbcMessageGroupStore) {
AggregatingMessageHandler aggregator =
new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
jdbcMessageGroupStore);
aggregator.setOutputChannel(resultsChannel());
aggregator.setGroupTimeoutExpression(new ValueExpression<>(500L));
aggregator.setTaskScheduler(this.taskScheduler);
return aggregator;
}
有关更多信息,请参见Programming Model and Annotations on @Bean
Methods。
从版本 4.2 开始,可以使用来简化 .AggregatorFactoryBean AggregatingMessageHandler |
在 Aggregator 中管理状态:MessageGroupStore
Aggregator(以及 Spring Integration 中的其他一些模式)是一种有状态模式,它要求根据一段时间内到达的一组消息做出决策,所有这些消息都具有相同的关联键。
有状态模式(例如 )中接口的设计是由以下原则驱动的:组件(无论是由框架定义还是由用户定义)应能够保持无状态。
所有状态都由 承载,其管理委托给 。
接口定义如下:ReleaseStrategy
MessageGroup
MessageGroupStore
MessageGroupStore
public interface MessageGroupStore {
int getMessageCountForAllMessageGroups();
int getMarkedMessageCountForAllMessageGroups();
int getMessageGroupCount();
MessageGroup getMessageGroup(Object groupId);
MessageGroup addMessageToGroup(Object groupId, Message<?> message);
MessageGroup markMessageGroup(MessageGroup group);
MessageGroup removeMessageFromGroup(Object key, Message<?> messageToRemove);
MessageGroup markMessageFromGroup(Object key, Message<?> messageToMark);
void removeMessageGroup(Object groupId);
void registerMessageGroupExpiryCallback(MessageGroupCallback callback);
int expireMessageGroups(long timeout);
}
有关更多信息,请参阅 Javadoc。
在等待发布策略触发时累积状态信息,并且该事件可能永远不会发生。
因此,为了防止过时的消息挥之不去,并让易失性存储提供一个钩子,以便在应用程序关闭时进行清理,允许您注册回调,以便在它们过期时应用于它。
该界面非常简单,如下面的清单所示:MessageGroupStore
MessageGroups
MessageGroupStore
MessageGroups
public interface MessageGroupCallback {
void execute(MessageGroupStore messageGroupStore, MessageGroup group);
}
回调可以直接访问存储和消息组,以便它可以管理持久状态(例如,通过从存储中完全删除组)。
维护这些回调的列表,它根据需要将其应用于时间戳早于作为参数提供的时间的所有消息(请参阅前面描述的 和 方法)。MessageGroupStore
registerMessageGroupExpiryCallback(..)
expireMessageGroups(..)
当您打算依赖该功能时,不要在不同的聚合器组件中使用相同的实例,这一点很重要。
每个 Symbol 都会根据回调注册自己的 Alpha S Alpha S Role。
这样,每个过期的组都可能被错误的聚合器完成或丢弃。
从版本 5.0.10 开始,a 用于 .
反过来,该 会检查是否存在此类的实例,如果回调集中已存在该实例,则记录错误并显示相应的消息。
这样,框架不允许在不同的聚合器/重排序器中使用实例,以避免上述未由特定关联处理程序创建的组过期的副作用。MessageGroupStore expireMessageGroups AbstractCorrelatingMessageHandler MessageGroupCallback forceComplete() UniqueExpiryCallback AbstractCorrelatingMessageHandler MessageGroupStore MessageGroupStore MessageGroupStore |
您可以使用超时值调用该方法。
任何早于当前时间减去此值的消息都已过期,并应用了回调。
因此,是 store 的用户定义了消息组 “expiry” 的含义。expireMessageGroups
为了方便用户, Spring 集成以 a 的形式为消息过期提供了一个包装器,如下例所示:MessageGroupStoreReaper
<bean id="reaper" class="org...MessageGroupStoreReaper">
<property name="messageGroupStore" ref="messageStore"/>
<property name="timeout" value="30000"/>
</bean>
<task:scheduled-tasks scheduler="scheduler">
<task:scheduled ref="reaper" method="run" fixed-rate="10000"/>
</task:scheduled-tasks>
收割者是一个 .
在前面的示例中,消息组存储的 expire 方法每 10 秒调用一次。
超时本身为 30 秒。Runnable
请务必了解,的 'timeout' 属性是一个近似值,受任务计划程序速率的影响,因为此属性仅在任务的下一次计划执行时检查。
例如,如果超时设置为 10 分钟,但任务计划每小时运行一次,并且任务的最后一次执行发生在超时前一分钟,则在接下来的 59 分钟内不会过期。
因此,我们建议将速率设置为至少等于或更短的超时值。MessageGroupStoreReaper MessageGroupStoreReaper MessageGroupStoreReaper MessageGroupStoreReaper MessageGroup |
除了 reaper 之外,当应用程序关闭时,还会通过 .AbstractCorrelatingMessageHandler
它会注册自己的过期回调,这是聚合器的 XML 配置中带有 boolean 标志的链接。
如果该标志设置为 ,则在调用到期回调时,可将尚未发布的组中的任何未标记消息发送到输出通道。AbstractCorrelatingMessageHandler
send-partial-result-on-expiry
true
由于 the 是从计划任务中调用的,并且可能导致向下游集成流生成消息(取决于选项),因此建议通过 提供具有 to 处理程序异常的自定义,因为常规聚合器发布功能可能会预期如此。
相同的逻辑也适用于组超时功能,该功能也依赖于 .
有关更多信息,请参阅错误处理。MessageGroupStoreReaper sendPartialResultOnExpiry TaskScheduler MessagePublishingErrorHandler errorChannel TaskScheduler |
当 shared 用于不同的关联终端节点时,您必须配置适当的 ID 以确保组 ID 的唯一性。
否则,当一个关联终端节点释放来自其他终端节点的消息或使其他关联终端节点的消息过期时,可能会发生意外行为。
具有相同关联键的消息存储在同一个消息组中。 某些实现允许通过对数据进行分区来使用相同的物理资源。
例如,the has a property, and the has a property. 有关接口及其实现的更多信息,请参阅 Message Store. |
Flux 聚合器
在 5.2 版中,引入了该组件。
它基于 Project Reactor 和运算符。
传入的消息将发出到由此组件的构造函数 in 发起的 中。
如果未提供 ,或者它不是 instance ,则从 implementation 完成对 main 的订阅。
否则,它将推迟到 implementation 完成的 subscription 。
消息按 using a for the group 键进行分组。
默认情况下,将查询消息的标头。FluxAggregatorMessageHandler
Flux.groupBy()
Flux.window()
FluxSink
Flux.create()
outputChannel
ReactiveStreamsSubscribableChannel
Flux
Lifecycle.start()
ReactiveStreamsSubscribableChannel
Flux.groupBy()
CorrelationStrategy
IntegrationMessageHeaderAccessor.CORRELATION_ID
默认情况下,每个关闭的窗口都作为要生成的消息的 in payload 发布。
此消息包含窗口中第一条消息的所有标头。
输出消息负载中的此内容必须在下游订阅和处理。
这样的逻辑可以被 的 配置选项 自定义 (或取代)。
例如,如果我们想在最终消息中包含 of payloads,我们可以配置如下:Flux
Flux
setCombineFunction(Function<Flux<Message<?>>, Mono<Message<?>>>)
FluxAggregatorMessageHandler
List
Flux.collectList()
fluxAggregatorMessageHandler.setCombineFunction(
(messageFlux) ->
messageFlux
.map(Message::getPayload)
.collectList()
.map(GenericMessage::new));
to select an appropriate window 策略中有几个选项:FluxAggregatorMessageHandler
-
setBoundaryTrigger(Predicate<Message<?>>)
- 传播到运算符。 有关更多信息,请参阅其 JavaDocs。 优先于所有其他窗口选项。Flux.windowUntil()
-
setWindowSize(int)
和 - 传播到 或 。 默认情况下,窗口大小是根据组中的第一条消息及其标头计算的。setWindowSizeFunction(Function<Message<?>, Integer>)
Flux.window(int)
windowTimeout(int, Duration)
IntegrationMessageHeaderAccessor.SEQUENCE_SIZE
-
setWindowTimespan(Duration)
- 传播到 OR,具体取决于窗口大小配置。Flux.window(Duration)
windowTimeout(int, Duration)
-
setWindowConfigurer(Function<Flux<Message<?>>, Flux<Flux<Message<?>>>>)
- 一个函数,用于将变换应用于 Grouped Fluxes 中,用于 Exposed 选项未涵盖的任何自定义窗口操作。
由于此组件是一个实现,因此它可以简单地用作定义和消息传递注释。
使用 Java DSL 可以从 EIP 方法使用它。
下面的示例演示了我们如何在运行时注册 a 以及如何将 a 与上游的拆分器相关联:MessageHandler
@Bean
@ServiceActivator
.handle()
IntegrationFlow
FluxAggregatorMessageHandler
IntegrationFlow fluxFlow =
(flow) -> flow
.split()
.channel(MessageChannels.flux())
.handle(new FluxAggregatorMessageHandler());
IntegrationFlowContext.IntegrationFlowRegistration registration =
this.integrationFlowContext.registration(fluxFlow)
.register();
Flux<Message<?>> window =
registration.getMessagingTemplate()
.convertSendAndReceive(new Integer[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }, Flux.class);
消息组的条件
从版本 5.5 开始,一个(包括其 Java & XML DSLs)公开了一个实现选项。
此函数用于添加到组的每条消息,并将结果条件句子存储到组中以供将来考虑。
可以查询此条件,而不是迭代组中的所有消息。
有关更多信息,请参阅 JavaDocs 和 Message Group Condition 。AbstractCorrelatingMessageHandler
groupConditionSupplier
BiFunction<Message<?>, String, String>
ReleaseStrategy
GroupConditionProvider
另请参阅 File Aggregator。
重定序器
resequencer 与 聚合器相关,但用途不同。 当聚合器合并消息时,resequencer 传递消息而不更改它们。
功能性
resequencer 的工作方式与 aggregator 类似,因为它使用 the 将消息存储在组中。
区别在于 Resequencer 不以任何方式处理消息。
相反,它会按照其 Headers 值的顺序释放它们。CORRELATION_ID
SEQUENCE_NUMBER
对此,您可以选择一次释放所有消息(在整个序列之后,根据 和其他可能性)或在有效序列可用时立即释放。
(我们将在本章后面介绍我们所说的 “有效序列” 的含义。SEQUENCE_SIZE
resequencer 旨在对具有较小间隙的相对较短的消息序列进行重新排序。 如果您有大量具有许多间隙的不相交序列,则可能会遇到性能问题。 |
配置 Resequencer
有关在 Java DSL 中配置重新排序器的信息,请参阅聚合器和重新排序器。
配置 resequencer 只需要在 XML 中包含适当的元素。
以下示例显示了 resequencer 配置:
<int:channel id="inputChannel"/>
<int:channel id="outputChannel"/>
<int:resequencer id="completelyDefinedResequencer" (1)
input-channel="inputChannel" (2)
output-channel="outputChannel" (3)
discard-channel="discardChannel" (4)
release-partial-sequences="true" (5)
message-store="messageStore" (6)
send-partial-result-on-expiry="true" (7)
send-timeout="86420000" (8)
correlation-strategy="correlationStrategyBean" (9)
correlation-strategy-method="correlate" (10)
correlation-strategy-expression="headers['something']" (11)
release-strategy="releaseStrategyBean" (12)
release-strategy-method="release" (13)
release-strategy-expression="size() == 10" (14)
empty-group-min-timeout="60000" (15)
lock-registry="lockRegistry" (16)
group-timeout="60000" (17)
group-timeout-expression="size() ge 2 ? 100 : -1" (18)
scheduler="taskScheduler" /> (19)
expire-group-upon-timeout="false" /> (20)
1 | resequencer 的 id 是可选的。 |
2 | resequencer 的 input channel。 必填。 |
3 | resequencer 将重新排序的消息发送到的通道。 自选。 |
4 | resequencer 将超时消息发送到的通道(如果设置为 )。
自选。send-partial-result-on-timeout false |
5 | 是立即发送有序序列,还是在整个消息组到达后才发送。
自选。
(默认值为 .)false |
6 | 对 a 的引用,可用于将消息组存储在其关联键下,直到它们完成。
自选。
(默认值为 volatile 内存存储。MessageGroupStore |
7 | 在组过期时,是否应发送有序的组(即使缺少某些消息)。
自选。
(默认值为 false。
请参阅管理聚合器中的状态:MessageGroupStore 。 |
8 | 向 或 发送回复时要等待的超时间隔。
默认为 ,这将无限期地阻止。
仅当 output 通道具有一些 'sending' 限制(例如具有固定 'capacity' 时),它才适用。
在这种情况下,会引发 a。
对于实现,将忽略 。
对于 ,from the scheduled expire 任务会导致此任务被重新计划。
自选。Message output-channel discard-channel -1 QueueChannel MessageDeliveryException send-timeout AbstractSubscribableChannel group-timeout(-expression) MessageDeliveryException |
9 | 对实现消息关联(分组)算法的 Bean 的引用。
bean 可以是接口的实现或 POJO。
在后一种情况下,还必须定义 attribute。
自选。
(默认情况下,聚合器使用 header.)CorrelationStrategy correlation-strategy-method IntegrationMessageHeaderAccessor.CORRELATION_ID |
10 | 在 Bean 上定义的方法,该方法由 Bean 引用并实现相关决策算法。
可选,有限制(需要存在)。correlation-strategy correlation-strategy |
11 | 表示关联策略的 SPEL 表达式。
例:。
只允许使用其中之一 or。"headers['something']" correlation-strategy correlation-strategy-expression |
12 | 对实现发布策略的 bean 的引用。
bean 可以是接口的实现或 POJO。
在后一种情况下,还必须定义 attribute。
可选(默认情况下,聚合器将使用 header 属性)。ReleaseStrategy release-strategy-method IntegrationMessageHeaderAccessor.SEQUENCE_SIZE |
13 | 在 Bean 上定义的方法,该方法引用并实现完成决策算法。
可选,有限制(需要存在)。release-strategy release-strategy |
14 | 表示发布策略的 SPEL 表达式。
表达式的根对象是 .
例:。
只允许使用其中之一 or。MessageGroup "size() == 5" release-strategy release-strategy-expression |
15 | 仅当为 配置了 时 才适用。
默认情况下,当 配置为使部分组过期时,也会删除空组。
正常释放组后,存在空组。
这是为了能够检测和丢弃延迟到达的消息。
如果您希望使空组过期的时间比使部分组过期的时间更长,请设置此属性。
然后,空组不会从 中删除,直到它们至少在此毫秒数内未被修改。
请注意,使空 group 过期的实际时间也受收割者的 timeout 属性的影响,它可能等于此值加上 timeout。MessageGroupStoreReaper <resequencer> MessageStore MessageGroupStoreReaper MessageStore |
16 | 请参阅使用 XML 配置聚合器。 |
17 | 请参阅使用 XML 配置聚合器。 |
18 | 请参阅使用 XML 配置聚合器。 |
19 | 请参阅使用 XML 配置聚合器。 |
20 | 默认情况下,当组由于超时(或 a )而完成时,将保留空组的元数据。
延迟到达的消息将被立即丢弃。
将此项设置为 可完全删除组。
然后,延迟到达的消息将启动一个新组,并且在该组再次超时之前不会被丢弃。
由于导致超时的序列范围中存在 “hole” ,因此新组永远不会正常释放。
空组可以稍后通过使用 a 和属性来过期(完全删除)。
从版本 5.0 开始,空组也会计划在过期后删除。
默认值为 'false'。MessageGroupStoreReaper true MessageGroupStoreReaper empty-group-min-timeout empty-group-min-timeout |
另请参阅 Aggregator Expiring Groups 了解更多信息。
由于在 Java 类中没有要为 resequencers 实现的自定义行为,因此没有对它的 Comments 支持。 |
消息处理程序链
这是一个实现,可以配置为单个消息端点,同时实际上委托给其他处理程序链,例如过滤器、转换器、拆分器等。
当需要以固定的线性进度连接多个处理程序时,这可能会导致更简单的配置。
例如,在其他组件之前提供 transformer 是相当常见的。
同样,当您在链中的其他组件之前提供过滤器时,您实际上是创建了一个选择性使用者。
无论哪种情况,链都只需要 single 和 single ,无需为每个单独的组件定义通道。MessageHandlerChain
MessageHandler
input-channel
output-channel
这主要是为 XML 配置设计的。
对于 Java DSL,可以将定义视为链组件,但它与下面本章中描述的概念和原则无关。
有关更多信息,请参阅 Java DSL。MessageHandlerChain IntegrationFlow |
Spring 集成提供了一个布尔属性: 。
当您在具有不同接受条件的同一点对点通道上提供多个选择性使用者时,应将此值设置为“true”(默认值为 ),以便调度程序知道消息已被拒绝,并因此尝试将消息传递给其他订阅者。
如果未引发异常,则调度程序会认为消息已成功传递,即使筛选器已删除消息以防止进一步处理。
如果您确实想要 “drop” 消息,过滤器的 'discard-channel' 可能很有用,因为它确实让您有机会对丢弃的消息执行某些操作(例如将其发送到 JMS 队列或将其写入日志)。Filter throwExceptionOnRejection false |
处理程序链简化了配置,同时在内部保持组件之间相同程度的松散耦合,如果在某些时候需要非线性排列,修改配置是很简单的。
在内部,该链被扩展为列出的端点的线性设置,由匿名通道分隔。
在链中不考虑 reply channel headers。
只有在调用最后一个处理程序后,生成的消息才会转发到回复通道或链的输出通道。
由于这种设置,除了最后一个处理程序之外,所有处理程序都必须实现接口(它提供了一个 'setOutputChannel()' 方法)。
如果 on the 已设置,则最后一个处理程序只需要一个 output channel。MessageProducer
outputChannel
MessageHandlerChain
与其他终端节点一样,该是可选的。
如果链的末尾有回复消息,则 output-channel 优先。
但是,如果它不可用,则链处理程序将检查入站消息上的回复通道头作为回退。output-channel |
在大多数情况下,您无需自行实施。
下一节重点介绍 chain 元素的命名空间支持。
大多数 Spring 集成端点(例如服务激活器和转换器)都适合在 .MessageHandler
MessageHandlerChain
配置链
该元素提供了一个属性。
如果链中的最后一个元素能够生成回复消息(可选),则它还支持属性。
然后,子元素是 filters、transformers、splitter 和 service-activator。
最后一个元素也可以是 router 或 outbound channel adapter。
以下示例显示了链定义:<chain>
input-channel
output-channel
<int:chain input-channel="input" output-channel="output">
<int:filter ref="someSelector" throw-exception-on-rejection="true"/>
<int:header-enricher>
<int:header name="thing1" value="thing2"/>
</int:header-enricher>
<int:service-activator ref="someService" method="someMethod"/>
</int:chain>
前面示例中使用的元素设置一个消息标头,该消息的值为 。
Header Enricher 是仅涉及 Headers 值的专用化。
你可以通过实现一个进行 Headers 修改并将其连接为 bean 来获得相同的结果,但是 header-enricher 是一个更简单的选项。<header-enricher>
thing1
thing2
Transformer
MessageHandler
可以配置为消息流的最后一个 “closed-box” 使用者。
对于此解决方案,您可以将其放在 <链的末尾>一些 <outbound-channel-adapter>,如下例所示:<chain>
<int:chain input-channel="input">
<int-xml:marshalling-transformer marshaller="marshaller" result-type="StringResult" />
<int:service-activator ref="someService" method="someMethod"/>
<int:header-enricher>
<int:header name="thing1" value="thing2"/>
</int:header-enricher>
<int:logging-channel-adapter level="INFO" log-full-message="true"/>
</int:chain>
不允许的属性和元素
某些属性 (如 和 不允许在链中使用的组件上指定)。
poller 子元素也是如此。 对于 Spring 集成核心组件,XML 模式本身会强制执行其中一些约束。 但是,对于非核心组件或您自己的自定义组件,这些约束由 XML 名称空间解析器(而不是 XML 架构)强制执行。 这些 XML 名称空间解析器约束是在 Spring Integration 2.2 中添加的。
如果尝试使用不允许的属性和元素,则 XML 命名空间解析器会引发 . |
使用 'id' 属性
从 Spring Integration 3.0 开始,如果为链元素赋予了一个属性,则该元素的 bean 名称是链和元素本身的组合。
没有属性的元素不会注册为 bean,但每个元素都会被赋予一个包含 chain 的 bean 。
请考虑以下示例:id
id
id
id
componentName
id
<int:chain id="somethingChain" input-channel="input">
<int:service-activator id="somethingService" ref="someService" method="someMethod"/>
<int:object-to-json-transformer/>
</int:chain>
在前面的示例中:
-
根元素具有 'somethingChain'。 因此,实现( 或 ,取决于类型)Bean 将此值作为其 Bean 名称。
<chain>
id
AbstractEndpoint
PollingConsumer
EventDrivenConsumer
input-channel
-
该 Bean 获取一个 Bean 别名('somethingChain.handler'),它允许从 .
MessageHandlerChain
BeanFactory
-
这不是一个完全成熟的消息收发端点(它不是 or )。 它是 . 在这种情况下,注册到 的 bean 名称是 'somethingChain$child.somethingService.handler'。
<service-activator>
PollingConsumer
EventDrivenConsumer
MessageHandler
<chain>
BeanFactory
-
this 的值相同,但没有 '.handler' 后缀。 它变为 'somethingChain$child.somethingService'。
componentName
ServiceActivatingHandler
-
最后一个子组件 没有属性。 它基于它在 中的位置。 在本例中,它是 'somethingChain$child#1'。 (名称的最后一个元素是链中的顺序,以 '#0' 开头)。 请注意,此转换器未在应用程序上下文中注册为 Bean,因此它不会获得 . 但是,它的值可用于日志记录和其他目的。
<chain>
<object-to-json-transformer>
id
componentName
<chain>
beanName
componentName
在元素上提供显式属性以简化日志中子组件的标识并提供从 etc 访问它们非常有用。id <chain> BeanFactory |
从 Chain 中调用 Chain
有时,您需要从链中对另一个链进行嵌套调用,然后返回并继续在原始链中执行。 为此,您可以通过包含 <gateway> 元素来使用消息传送网关,如下例所示:
<int:chain id="main-chain" input-channel="in" output-channel="out">
<int:header-enricher>
<int:header name="name" value="Many" />
</int:header-enricher>
<int:service-activator>
<bean class="org.foo.SampleService" />
</int:service-activator>
<int:gateway request-channel="inputA"/>
</int:chain>
<int:chain id="nested-chain-a" input-channel="inputA">
<int:header-enricher>
<int:header name="name" value="Moe" />
</int:header-enricher>
<int:gateway request-channel="inputB"/>
<int:service-activator>
<bean class="org.foo.SampleService" />
</int:service-activator>
</int:chain>
<int:chain id="nested-chain-b" input-channel="inputB">
<int:header-enricher>
<int:header name="name" value="Jack" />
</int:header-enricher>
<int:service-activator>
<bean class="org.foo.SampleService" />
</int:service-activator>
</int:chain>
在前面的示例中,由此处配置的 'gateway' 元素在处理结束时调用。
在 中,在标头扩充后调用 a。
然后,流返回以完成 中的执行。
最后,流返回到 。
在链中定义元素的嵌套版本时,它不需要该属性。
相反,它采用当前状态的消息,并将其放置在属性中定义的通道上。
当该网关启动的下游流完成时,a 将返回到网关并继续其在当前链中的旅程。nested-chain-a
main-chain
nested-chain-a
nested-chain-b
nested-chain-b
main-chain
<gateway>
service-interface
request-channel
Message
分散-聚集
从版本 4.1 开始, Spring 集成提供了分散-聚集企业集成模式的实现。 它是一个复合终端节点,其目标是向收件人发送消息并聚合结果。 正如 Enterprise Integration Patterns 中所指出的,它是 “best quote” 等场景的一个组件,在这种情况下,我们需要从多个供应商那里请求信息,并决定哪一个为我们提供所请求项目的最佳术语。
以前,可以使用离散元件来配置模式。 此次优化带来了更便捷的配置。
它是一个请求-回复终端节点,它结合了 a(或 a )和 .
请求消息将发送到通道,并等待聚合器发送到 的回复。ScatterGatherHandler
PublishSubscribeChannel
RecipientListRouter
AggregatingMessageHandler
scatter
ScatterGatherHandler
outputChannel
功能性
该模式建议两种情况:“auction”和“distribution”。
在这两种情况下,函数都是相同的,并且提供了 .
(实际上,它只需要一个 as 构造函数参数。
有关更多信息,请参阅 Aggregator。Scatter-Gather
aggregation
AggregatingMessageHandler
ScatterGatherHandler
AggregatingMessageHandler
拍卖
拍卖变体对请求消息使用“发布-订阅”逻辑,其中“分散”通道是 with 。
但是,此通道可以是任何实现(就像 中的一样 — 请参阅 Content Enricher)。
但是,在这种情况下,您应该为函数创建自己的自定义。Scatter-Gather
PublishSubscribeChannel
apply-sequence="true"
MessageChannel
request-channel
ContentEnricher
correlationStrategy
aggregation
分配
分发变体基于 (请参阅 RecipientListRouter
) 以及 的所有可用选项 。
这是第二个 constructor 参数。
如果只想依赖 和 的默认值,则应指定 。
否则,您应该为 .
与变体 (auction 变体) 不同,拥有一个选项可让 Target Suppliers 根据消息进行筛选。
使用 ,提供默认值,并且可以正确释放组。
distribution 选项与 auction 选项互斥。Scatter-Gather
RecipientListRouter
RecipientListRouter
ScatterGatherHandler
correlationStrategy
recipient-list-router
aggregator
apply-sequence="true"
correlationStrategy
aggregator
PublishSubscribeChannel
recipient-list-router
selector
apply-sequence="true"
sequenceSize
aggregator
只有基于构造函数配置的普通 Java 配置才需要 this,因为框架无法改变外部提供的组件。
为方便起见,从版本 6.0 开始,XML 和 Java DSL for 设置为 true。applySequence=true ScatterGatherHandler(MessageHandler scatterer, MessageHandler gatherer) Scatter-Gather applySequence |
对于拍卖和分发变体,请求 (scatter) 消息都使用标头进行扩充,以等待来自的回复消息。gatherResultChannel
aggregator
默认情况下,所有供应商都应该将其结果发送到 header(通常通过省略 from the ultimate endpoint)。
但是,还提供了该选项,允许供应商将他们的回复发送到该通道以进行聚合。replyChannel
output-channel
gatherChannel
配置 Scatter-Gather 端点
以下示例显示了 的 bean 定义的 Java 配置:Scatter-Gather
@Bean
public MessageHandler distributor() {
RecipientListRouter router = new RecipientListRouter();
router.setApplySequence(true);
router.setChannels(Arrays.asList(distributionChannel1(), distributionChannel2(),
distributionChannel3()));
return router;
}
@Bean
public MessageHandler gatherer() {
return new AggregatingMessageHandler(
new ExpressionEvaluatingMessageGroupProcessor("^[payload gt 5] ?: -1D"),
new SimpleMessageStore(),
new HeaderAttributeCorrelationStrategy(
IntegrationMessageHeaderAccessor.CORRELATION_ID),
new ExpressionEvaluatingReleaseStrategy("size() == 2"));
}
@Bean
@ServiceActivator(inputChannel = "distributionChannel")
public MessageHandler scatterGatherDistribution() {
ScatterGatherHandler handler = new ScatterGatherHandler(distributor(), gatherer());
handler.setOutputChannel(output());
return handler;
}
在前面的示例中,我们使用 Bean 和收件人通道列表进行配置。
下一个 bean 用于 .
最后,我们将这两个 bean 注入到 bean 定义中,并将其标记为 a 以将 scatter-gather 组件连接到集成流中。RecipientListRouter
distributor
applySequence="true"
AggregatingMessageHandler
ScatterGatherHandler
@ServiceActivator
以下示例演示如何使用 XML 命名空间配置终端节点:<scatter-gather>
<scatter-gather
id="" (1)
auto-startup="" (2)
input-channel="" (3)
output-channel="" (4)
scatter-channel="" (5)
gather-channel="" (6)
order="" (7)
phase="" (8)
send-timeout="" (9)
gather-timeout="" (10)
requires-reply="" > (11)
<scatterer/> (12)
<gatherer/> (13)
</scatter-gather>
1 | 终端节点的 ID。
该 Bean 使用别名 .
该 Bean 使用别名 .
这。
自选。
(这将生成一个默认值。ScatterGatherHandler id + '.handler' RecipientListRouter id + '.scatterer' AggregatingMessageHandler`bean is registered with an alias of `id + '.gatherer' BeanFactory id |
2 | 生命周期属性指示是否应在应用程序上下文初始化期间启动终端节点。
此外, 还 implements 和 starts and stops ,如果提供了 a ,则会在内部创建。
自选。
(默认值为 .)ScatterGatherHandler Lifecycle gatherEndpoint gather-channel true |
3 | 接收请求消息的通道,以便在 .
必填。ScatterGatherHandler |
4 | 将聚合结果发送到的通道。
自选。
(传入邮件可以在邮件头中自行指定回复通道)。ScatterGatherHandler replyChannel |
5 | 将拍卖场景的分散消息发送到的通道。
自选。
与 sub-element 互斥。<scatterer> |
6 | 接收来自每个供应商的聚合回复的通道。
它用作散点消息中的标头。
自选。
默认情况下,已创建 。replyChannel FixedSubscriberChannel |
7 | 当多个处理程序订阅同一组件时,此组件的顺序(用于负载平衡目的)。
自选。DirectChannel |
8 | 指定终端节点的启动和停止阶段。
启动顺序从最低到最高,关闭顺序从最高到最低。
默认情况下,此值为 ,表示此容器尽可能晚地启动并尽快停止。
自选。Integer.MAX_VALUE |
9 | 向 .
默认情况下,块为 1 秒。
仅当 output channel 有一些 'sending' 限制时才适用 - 例如,具有固定 'capacity' 的 a 已满。
在这种情况下,会引发 a。
对于实现,将忽略 。
对于 ,from the scheduled expire 任务会导致此任务被重新计划。
自选。Message output-channel send() QueueChannel MessageDeliveryException send-timeout AbstractSubscribableChannel group-timeout(-expression) MessageDeliveryException |
10 | 用于指定 scatter-gather 在返回之前等待回复消息的时间。
默认情况下,它会无限期等待。
如果回复超时,则返回 'null'。
自选。
它默认为 ,表示无限期等待。-1 |
11 | 指定 scatter-gather 是否必须返回非 null 值。
此值为默认值。
因此,当基础聚合器在 之后返回 null 值时,将引发 a 。
请注意,如果有可能,则应指定 the 以避免无限期等待。true ReplyRequiredException gather-timeout null gather-timeout |
12 | 选项。
自选。
与 attribute 互斥。<recipient-list-router> scatter-channel |
13 | 选项。
必填。<aggregator> |
错误处理
由于 Scatter-Gather 是一个多请求-回复组件,因此错误处理具有一些额外的复杂性。
在某些情况下,如果允许进程以比请求更少的回复完成,则最好只捕获并忽略下游异常。
在其他情况下,当发生错误时,应考虑从子流返回类似 “补偿消息” 的内容。ReleaseStrategy
每个异步子流都应配置一个标头,以便从 发送正确的错误消息。
否则,错误将发送到具有常见错误处理逻辑的全局变量。
有关异步错误处理的更多信息,请参阅错误处理。errorChannel
MessagePublishingErrorHandler
errorChannel
同步流可以使用 an 来忽略异常或返回补偿消息。
当异常从其中一个子流引发到 时,它只是被重新引发到上游。
这样,所有其他子流都将毫无用处,并且它们的回复将在 .
这有时可能是预期的行为,但在大多数情况下,最好在不影响所有其他子流和收集器中的期望的情况下处理特定子流中的错误。ExpressionEvaluatingRequestHandlerAdvice
ScatterGatherHandler
ScatterGatherHandler
从版本 5.1.3 开始,提供了 选项。
它被填充到分散消息的标头中,并在发生异步错误时使用,或者可以在常规同步子流中使用以直接发送错误消息。ScatterGatherHandler
errorChannelName
errorChannel
以下示例配置通过返回补偿消息来演示异步错误处理:
@Bean
public IntegrationFlow scatterGatherAndExecutorChannelSubFlow(TaskExecutor taskExecutor) {
return f -> f
.scatterGather(
scatterer -> scatterer
.recipientFlow(f1 -> f1.transform(p -> "Sub-flow#1"))
.recipientFlow(f2 -> f2
.channel(c -> c.executor(taskExecutor))
.transform(p -> {
throw new RuntimeException("Sub-flow#2");
})),
null,
s -> s.errorChannel("scatterGatherErrorChannel"));
}
@ServiceActivator(inputChannel = "scatterGatherErrorChannel")
public Message<?> processAsyncScatterError(MessagingException payload) {
return MessageBuilder.withPayload(payload.getCause().getCause())
.copyHeaders(payload.getFailedMessage().getHeaders())
.build();
}
为了生成正确的回复,我们必须从已发送到 的 中复制标头(包括 和 )。
这样,目标异常将返回给 for reply messages 组完成的收集器。
这样的异常可以在 Gatherer 中过滤掉,或者在 scatter-gather 端点之后以其他方式在下游进行处理。replyChannel
errorChannel
failedMessage
MessagingException
scatterGatherErrorChannel
MessagePublishingErrorHandler
ScatterGatherHandler
payload
MessageGroupProcessor
在将分散结果发送到收集器之前,恢复请求消息标头,包括回复和错误通道(如果有)。
这样,来自的错误将传播到调用方,即使在分散收件人子流中应用了异步切换也是如此。
要成功操作,必须将 、 和 标头传输回分散收件人子流中的回复。
在这种情况下,必须为 配置一个合理的 finite 。
否则,默认情况下,它将永远被阻止,等待 Gatherer 的回复。ScatterGatherHandler AggregatingMessageHandler gatherResultChannel originalReplyChannel originalErrorChannel gatherTimeout ScatterGatherHandler |
线程屏障
有时,我们需要暂停消息流线程,直到发生其他异步事件。 例如,考虑一个向 RabbitMQ 发布消息的 HTTP 请求。 在 RabbitMQ 代理发出收到消息的确认之前,我们可能希望不回复用户。
在版本 4.2 中, Spring 集成为此目的引入了该组件。
底层是 .
此类还实现 ,其中传递给方法的消息释放方法中的相应线程(如果存在)。<barrier/>
MessageHandler
BarrierMessageHandler
MessageTriggerAction
trigger()
handleRequestMessage()
挂起的线程和触发器线程通过对消息调用 a 进行关联。
当消息发送到 时,线程将暂停长达几毫秒,等待相应的触发器消息。
默认关联策略使用 header。
当触发器消息以相同的关联到达时,线程将被释放。
发送到 after release 的消息是使用 .
默认情况下,消息是两个负载中的一个,并且标头是使用 .CorrelationStrategy
input-channel
requestTimeout
IntegrationMessageHeaderAccessor.CORRELATION_ID
output-channel
MessageGroupProcessor
Collection<?>
DefaultAggregatingMessageGroupProcessor
如果首先调用该方法(或在主线程超时后调用),则该方法将暂停,直到等待挂起消息到达为止。
如果您不想暂停触发器线程,请考虑将 transfer transfer 交给 a,以便其线程被暂停。trigger() triggerTimeout TaskExecutor |
在 5.4 版本之前,请求和触发消息只有一个选项,但在某些用例中,最好为这些操作设置不同的超时。
因此,已经引入了选项。timeout requestTimeout triggerTimeout |
该属性确定如果挂起的线程在触发器消息到达之前超时时要采取的操作。
默认情况下,它是 ,这意味着端点返回 ,流结束,线程返回给调用方。
当 , a 被抛出时。requires-reply
false
null
true
ReplyRequiredException
你可以以编程方式调用该方法(使用名称获取 bean 引用,— 其中 是屏障端点的 bean 名称)。
或者,您可以配置 an 以触发发布。trigger()
barrier.handler
barrier
<outbound-channel-adapter/>
只能暂停一个具有相同关联的线程。 相同的关联可以多次使用,但只能同时使用一次。 如果第二个线程以相同的关联到达,则会引发异常。 |
以下示例演示如何使用自定义标头进行关联:
@ServiceActivator(inputChannel="in")
@Bean
public BarrierMessageHandler barrier(MessageChannel out, MessageChannel lateTriggerChannel) {
BarrierMessageHandler barrier = new BarrierMessageHandler(10000);
barrier.setOutputChannel(out());
barrier.setDiscardChannel(lateTriggerChannel);
return barrier;
}
@ServiceActivator (inputChannel="release")
@Bean
public MessageHandler releaser(MessageTriggerAction barrier) {
return barrier::trigger;
}
<int:barrier id="barrier1" input-channel="in" output-channel="out"
correlation-strategy-expression="headers['myHeader']"
output-processor="myOutputProcessor"
discard-channel="lateTriggerChannel"
timeout="10000">
</int:barrier>
<int:outbound-channel-adapter channel="release" ref="barrier1.handler" method="trigger" />
根据消息先到达的线程,发送消息的线程或发送消息的线程最多等待 10 秒,直到另一条消息到达。
当消息被释放时,通道将收到一条消息,该消息结合了调用自定义 Bean 的结果,名为 .
如果主线程超时并且触发器稍后到达,则可以配置将延迟触发器发送到的 discard 通道。in
release
out
MessageGroupProcessor
myOutputProcessor
有关此组件的示例,请参阅 barrier 示例应用程序。