分散-聚集
从版本 4.1 开始, Spring 集成提供了分散-聚集企业集成模式的实现。 它是一个复合终端节点,其目标是向收件人发送消息并聚合结果。 正如 Enterprise Integration Patterns 中所指出的,它是 “best quote” 等场景的一个组件,在这种情况下,我们需要从多个供应商那里请求信息,并决定哪一个为我们提供所请求项目的最佳术语。
以前,可以使用离散元件来配置模式。 此次优化带来了更便捷的配置。
这ScatterGatherHandler
是一个请求-回复终端节点,它结合了PublishSubscribeChannel
(或RecipientListRouter
) 和AggregatingMessageHandler
.
请求消息将发送到scatter
channel 和ScatterGatherHandler
等待聚合器发送到outputChannel
.
功能性
这Scatter-Gather
pattern 建议两种情况:“auction” 和 “distribution”。
在这两种情况下,aggregation
函数相同,并提供可用于AggregatingMessageHandler
.
(实际上,ScatterGatherHandler
只需要AggregatingMessageHandler
作为构造函数参数。
有关更多信息,请参阅 Aggregator。
拍卖
拍卖会Scatter-Gather
变体对请求消息使用“发布-订阅”逻辑,其中“分散”通道是一个PublishSubscribeChannel
跟apply-sequence="true"
.
但是,此通道可以是任何MessageChannel
实现(就像request-channel
在ContentEnricher
— 请参阅 Content Enricher)。
但是,在这种情况下,您应该创建自己的自定义correlationStrategy
对于aggregation
功能。
分配
分布Scatter-Gather
variant 基于RecipientListRouter
(参见RecipientListRouter
) 替换为RecipientListRouter
.
这是第二个ScatterGatherHandler
constructor 参数。
如果只想依赖默认的correlationStrategy
对于recipient-list-router
和aggregator
,您应该指定apply-sequence="true"
.
否则,您应该提供自定义correlationStrategy
对于aggregator
.
与PublishSubscribeChannel
variant(竞价变体)中,具有recipient-list-router
selector
选项 允许根据消息筛选目标供应商。
跟apply-sequence="true"
,则默认的sequenceSize
,并且aggregator
可以正确释放组。
distribution 选项与 auction 选项互斥。
这applySequence=true 仅对于基于ScatterGatherHandler(MessageHandler scatterer, MessageHandler gatherer) constructor 配置,因为框架无法更改外部提供的组件。
为方便起见,使用 XML 和 Java DSLScatter-Gather 集applySequence 从版本 6.0 开始更改为 true。 |
对于拍卖和分发变体,请求 (scatter) 消息都使用gatherResultChannel
标头等待来自aggregator
.
默认情况下,所有供应商都应将其结果发送到replyChannel
标头(通常通过省略output-channel
从最终终点开始)。
但是,gatherChannel
选项,让供应商将他们的回复发送到该通道进行聚合。
配置 Scatter-Gather 端点
以下示例显示了 的 bean 定义的 Java 配置Scatter-Gather
:
@Bean
public MessageHandler distributor() {
RecipientListRouter router = new RecipientListRouter();
router.setApplySequence(true);
router.setChannels(Arrays.asList(distributionChannel1(), distributionChannel2(),
distributionChannel3()));
return router;
}
@Bean
public MessageHandler gatherer() {
return new AggregatingMessageHandler(
new ExpressionEvaluatingMessageGroupProcessor("^[payload gt 5] ?: -1D"),
new SimpleMessageStore(),
new HeaderAttributeCorrelationStrategy(
IntegrationMessageHeaderAccessor.CORRELATION_ID),
new ExpressionEvaluatingReleaseStrategy("size() == 2"));
}
@Bean
@ServiceActivator(inputChannel = "distributionChannel")
public MessageHandler scatterGatherDistribution() {
ScatterGatherHandler handler = new ScatterGatherHandler(distributor(), gatherer());
handler.setOutputChannel(output());
return handler;
}
在前面的示例中,我们将RecipientListRouter
distributor
bean 替换为applySequence="true"
和收件人通道列表。
下一个 bean 用于AggregatingMessageHandler
.
最后,我们将这两个 bean 注入到ScatterGatherHandler
bean 定义并将其标记为@ServiceActivator
将 Scatter-gather 组件连接到集成流中。
以下示例显示如何配置<scatter-gather>
endpoint 的 XML 命名空间:
<scatter-gather
id="" (1)
auto-startup="" (2)
input-channel="" (3)
output-channel="" (4)
scatter-channel="" (5)
gather-channel="" (6)
order="" (7)
phase="" (8)
send-timeout="" (9)
gather-timeout="" (10)
requires-reply="" > (11)
<scatterer/> (12)
<gatherer/> (13)
</scatter-gather>
1 | 终端节点的 ID。
这ScatterGatherHandler bean 使用别名id + '.handler' .
这RecipientListRouter bean 使用别名id + '.scatterer' .
这AggregatingMessageHandler bean 使用别名id + '.gatherer' .
自选。
(该BeanFactory 生成默认的id 值。 |
2 | 生命周期属性指示是否应在应用程序上下文初始化期间启动终端节点。
此外,ScatterGatherHandler 还实现了Lifecycle 以及开始和停止gatherEndpoint ,该 API 是在内部创建的,如果gather-channel 。
自选。
(默认值为true .) |
3 | 接收请求消息的通道,用于在ScatterGatherHandler .
必填。 |
4 | 将ScatterGatherHandler 发送聚合结果。
自选。
(传入邮件可以在replyChannel 消息标头)。 |
5 | 将拍卖场景的分散消息发送到的通道。
自选。
与<scatterer> sub-元素。 |
6 | 接收来自每个供应商的聚合回复的通道。
它用作replyChannel 标头。
自选。
默认情况下,FixedSubscriberChannel 已创建。 |
7 | 当多个处理程序订阅相同的组件时,此组件的顺序DirectChannel (用于负载平衡目的)。
自选。 |
8 | 指定终端节点的启动和停止阶段。
启动顺序从最低到最高,关闭顺序从最高到最低。
默认情况下,此值为Integer.MAX_VALUE ,这意味着此容器会尽可能晚地启动并尽快停止。
自选。 |
9 | 发送回复时要等待的超时间隔Message 到output-channel .
默认情况下,send() 块 1 秒。
仅当输出通道具有一些 'sending' 限制时,它才适用 — 例如,QueueChannel 具有已满的固定“容量”。
在这种情况下,一个MessageDeliveryException 被抛出。
这send-timeout 被忽略AbstractSubscribableChannel 实现。
为group-timeout(-expression) 这MessageDeliveryException 从 Scheduled Expired 任务中,将重新计划此任务。
自选。 |
10 | 用于指定 scatter-gather 在返回之前等待回复消息的时间。
默认情况下,它会等待30 秒。
如果回复超时,则返回 'null'。
自选。 |
11 | 指定 scatter-gather 是否必须返回非 null 值。
该值为true 默认情况下。
因此,一个ReplyRequiredException 当底层聚合器在gather-timeout .
注意,如果null 是可能的,gather-timeout 以避免无限期等待。 |
12 | 这<recipient-list-router> 选项。
自选。
互斥scatter-channel 属性。 |
13 | 这<aggregator> 选项。
必填。 |
错误处理
由于 Scatter-Gather 是一个多请求-回复组件,因此错误处理具有一些额外的复杂性。
在某些情况下,如果ReleaseStrategy
允许进程以少于请求的回复完成。
在其他情况下,当发生错误时,应考虑从子流返回类似 “补偿消息” 的内容。
每个异步子流都应该配置一个errorChannel
标头,以便从MessagePublishingErrorHandler
.
否则,将向全局errorChannel
替换为常见的错误处理逻辑。
有关异步错误处理的更多信息,请参阅错误处理。
同步流可以使用ExpressionEvaluatingRequestHandlerAdvice
忽略异常或返回补偿消息。
当异常从其中一个子流抛出到ScatterGatherHandler
,它只是被重新抛向上游。
这样,所有其他子流都将无用,并且它们的回复将在ScatterGatherHandler
.
这有时可能是预期的行为,但在大多数情况下,最好在不影响所有其他子流和收集器中的期望的情况下处理特定子流中的错误。
从版本 5.1.3 开始,ScatterGatherHandler
随errorChannelName
选择。
它被填充到errorChannel
Headers 的 Header,并在发生异步错误时使用,或者可以在常规同步子流中使用,以直接发送错误消息。
以下示例配置通过返回补偿消息来演示异步错误处理:
@Bean
public IntegrationFlow scatterGatherAndExecutorChannelSubFlow(TaskExecutor taskExecutor) {
return f -> f
.scatterGather(
scatterer -> scatterer
.recipientFlow(f1 -> f1.transform(p -> "Sub-flow#1"))
.recipientFlow(f2 -> f2
.channel(c -> c.executor(taskExecutor))
.transform(p -> {
throw new RuntimeException("Sub-flow#2");
})),
null,
s -> s.errorChannel("scatterGatherErrorChannel"));
}
@ServiceActivator(inputChannel = "scatterGatherErrorChannel")
public Message<?> processAsyncScatterError(MessagingException payload) {
return MessageBuilder.withPayload(payload.getCause().getCause())
.copyHeaders(payload.getFailedMessage().getHeaders())
.build();
}
为了生成正确的回复,我们必须复制 headers(包括replyChannel
和errorChannel
) 从failedMessage
的MessagingException
已发送到scatterGatherErrorChannel
由MessagePublishingErrorHandler
.
这样,目标异常将返回给ScatterGatherHandler
用于回复消息组完成。
这样的异常payload
可以在MessageGroupProcessor
的 Gatherer 或其他方式处理的下游,在 scatter-gather 端点之后。
在将分散结果发送给 Gatherer 之前,ScatterGatherHandler 恢复请求消息标头,包括 reply 和 error 通道(如果有)。
这样,来自AggregatingMessageHandler 将传播给调用方,即使在 Scatter 接收者子流中应用了异步切换也是如此。
为了成功作,需要gatherResultChannel ,originalReplyChannel 和originalErrorChannel 标头必须传输回 Scatter 收件人子流的回复。
在这种情况下,一个合理的、有限的gatherTimeout 必须为ScatterGatherHandler .
否则,默认情况下,它将永远被阻止,等待 Gatherer 的回复。 |