此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.4.3spring-doc.cadn.net.cn

分散-聚集

从版本 4.1 开始, Spring 集成提供了分散-聚集企业集成模式的实现。 它是一个复合终端节点,其目标是向收件人发送消息并聚合结果。 正如 Enterprise Integration Patterns 中所指出的,它是 “best quote” 等场景的一个组件,在这种情况下,我们需要从多个供应商那里请求信息,并决定哪一个为我们提供所请求项目的最佳术语。spring-doc.cadn.net.cn

以前,可以使用离散元件来配置模式。 此次优化带来了更便捷的配置。spring-doc.cadn.net.cn

ScatterGatherHandler是一个请求-回复终端节点,它结合了PublishSubscribeChannel(或RecipientListRouter) 和AggregatingMessageHandler. 请求消息将发送到scatterchannel 和ScatterGatherHandler等待聚合器发送到outputChannel.spring-doc.cadn.net.cn

功能性

Scatter-Gatherpattern 建议两种情况:“auction” 和 “distribution”。 在这两种情况下,aggregation函数相同,并提供可用于AggregatingMessageHandler. (实际上,ScatterGatherHandler只需要AggregatingMessageHandler作为构造函数参数。 有关更多信息,请参阅 Aggregatorspring-doc.cadn.net.cn

拍卖

拍卖会Scatter-Gather变体对请求消息使用“发布-订阅”逻辑,其中“分散”通道是一个PublishSubscribeChannelapply-sequence="true". 但是,此通道可以是任何MessageChannel实现(就像request-channelContentEnricher— 请参阅 Content Enricher)。 但是,在这种情况下,您应该创建自己的自定义correlationStrategy对于aggregation功能。spring-doc.cadn.net.cn

分配

分布Scatter-Gathervariant 基于RecipientListRouter(参见RecipientListRouter) 替换为RecipientListRouter. 这是第二个ScatterGatherHandlerconstructor 参数。 如果只想依赖默认的correlationStrategy对于recipient-list-routeraggregator,您应该指定apply-sequence="true". 否则,您应该提供自定义correlationStrategy对于aggregator. 与PublishSubscribeChannelvariant(竞价变体)中,具有recipient-list-router selector选项 允许根据消息筛选目标供应商。 跟apply-sequence="true",则默认的sequenceSize,并且aggregator可以正确释放组。 distribution 选项与 auction 选项互斥。spring-doc.cadn.net.cn

applySequence=true仅对于基于ScatterGatherHandler(MessageHandler scatterer, MessageHandler gatherer)constructor 配置,因为框架无法更改外部提供的组件。 为方便起见,使用 XML 和 Java DSLScatter-GatherapplySequence从版本 6.0 开始更改为 true。

对于拍卖和分发变体,请求 (scatter) 消息都使用gatherResultChannel标头等待来自aggregator.spring-doc.cadn.net.cn

默认情况下,所有供应商都应将其结果发送到replyChannel标头(通常通过省略output-channel从最终终点开始)。 但是,gatherChannel选项,让供应商将他们的回复发送到该通道进行聚合。spring-doc.cadn.net.cn

配置 Scatter-Gather 端点

以下示例显示了 的 bean 定义的 Java 配置Scatter-Gather:spring-doc.cadn.net.cn

@Bean
public MessageHandler distributor() {
    RecipientListRouter router = new RecipientListRouter();
    router.setApplySequence(true);
    router.setChannels(Arrays.asList(distributionChannel1(), distributionChannel2(),
            distributionChannel3()));
    return router;
}

@Bean
public MessageHandler gatherer() {
	return new AggregatingMessageHandler(
			new ExpressionEvaluatingMessageGroupProcessor("^[payload gt 5] ?: -1D"),
			new SimpleMessageStore(),
			new HeaderAttributeCorrelationStrategy(
			       IntegrationMessageHeaderAccessor.CORRELATION_ID),
			new ExpressionEvaluatingReleaseStrategy("size() == 2"));
}

@Bean
@ServiceActivator(inputChannel = "distributionChannel")
public MessageHandler scatterGatherDistribution() {
	ScatterGatherHandler handler = new ScatterGatherHandler(distributor(), gatherer());
	handler.setOutputChannel(output());
	return handler;
}

在前面的示例中,我们将RecipientListRouter distributorbean 替换为applySequence="true"和收件人通道列表。 下一个 bean 用于AggregatingMessageHandler. 最后,我们将这两个 bean 注入到ScatterGatherHandlerbean 定义并将其标记为@ServiceActivator将 Scatter-gather 组件连接到集成流中。spring-doc.cadn.net.cn

以下示例显示如何配置<scatter-gather>endpoint 的 XML 命名空间:spring-doc.cadn.net.cn

<scatter-gather
		id=""  (1)
		auto-startup=""  (2)
		input-channel=""  (3)
		output-channel=""  (4)
		scatter-channel=""  (5)
		gather-channel=""  (6)
		order=""  (7)
		phase=""  (8)
		send-timeout=""  (9)
		gather-timeout=""  (10)
		requires-reply="" > (11)
			<scatterer/>  (12)
			<gatherer/>  (13)
</scatter-gather>
1 终端节点的 ID。 这ScatterGatherHandlerbean 使用别名id + '.handler'. 这RecipientListRouterbean 使用别名id + '.scatterer'. 这AggregatingMessageHandlerbean 使用别名id + '.gatherer'. 自选。 (该BeanFactory生成默认的id值。
2 生命周期属性指示是否应在应用程序上下文初始化期间启动终端节点。 此外,ScatterGatherHandler还实现了Lifecycle以及开始和停止gatherEndpoint,该 API 是在内部创建的,如果gather-channel。 自选。 (默认值为true.)
3 接收请求消息的通道,用于在ScatterGatherHandler. 必填。
4 ScatterGatherHandler发送聚合结果。 自选。 (传入邮件可以在replyChannel消息标头)。
5 将拍卖场景的分散消息发送到的通道。 自选。 与<scatterer>sub-元素。
6 接收来自每个供应商的聚合回复的通道。 它用作replyChannel标头。 自选。 默认情况下,FixedSubscriberChannel已创建。
7 当多个处理程序订阅相同的组件时,此组件的顺序DirectChannel(用于负载平衡目的)。 自选。
8 指定终端节点的启动和停止阶段。 启动顺序从最低到最高,关闭顺序从最高到最低。 默认情况下,此值为Integer.MAX_VALUE,这意味着此容器会尽可能晚地启动并尽快停止。 自选。
9 发送回复时要等待的超时间隔Messageoutput-channel. 默认情况下,send()块 1 秒。 仅当输出通道具有一些 'sending' 限制时,它才适用 — 例如,QueueChannel具有已满的固定“容量”。 在这种情况下,MessageDeliveryException被抛出。 这send-timeout被忽略AbstractSubscribableChannel实现。 为group-timeout(-expression)MessageDeliveryException从 Scheduled Expired 任务中,将重新计划此任务。 自选。
10 用于指定 scatter-gather 在返回之前等待回复消息的时间。 默认情况下,它会等待30秒。 如果回复超时,则返回 'null'。 自选。
11 指定 scatter-gather 是否必须返回非 null 值。 该值为true默认情况下。 因此,一个ReplyRequiredException当底层聚合器在gather-timeout. 注意,如果null是可能的,gather-timeout以避免无限期等待。
12 <recipient-list-router>选项。 自选。 互斥与scatter-channel属性。
13 <aggregator>选项。 必填。

错误处理

由于 Scatter-Gather 是一个多请求-回复组件,因此错误处理具有一些额外的复杂性。 在某些情况下,如果ReleaseStrategy允许进程以少于请求的回复完成。 在其他情况下,当发生错误时,应考虑从子流返回类似 “补偿消息” 的内容。spring-doc.cadn.net.cn

每个异步子流都应该配置一个errorChannel标头,以便从MessagePublishingErrorHandler. 否则,将向全局errorChannel替换为常见的错误处理逻辑。 有关异步错误处理的更多信息,请参阅错误处理spring-doc.cadn.net.cn

同步流可以使用ExpressionEvaluatingRequestHandlerAdvice忽略异常或返回补偿消息。 当异常从其中一个子流抛出到ScatterGatherHandler,它只是被重新抛向上游。 这样,所有其他子流都将无用,并且它们的回复将在ScatterGatherHandler. 这有时可能是预期的行为,但在大多数情况下,最好在不影响所有其他子流和收集器中的期望的情况下处理特定子流中的错误。spring-doc.cadn.net.cn

从版本 5.1.3 开始,ScatterGatherHandlererrorChannelName选择。 它被填充到errorChannelHeader,在发生异步错误时使用,或者可以在常规同步子流中使用,以直接发送错误消息。spring-doc.cadn.net.cn

以下示例配置通过返回补偿消息来演示异步错误处理:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow scatterGatherAndExecutorChannelSubFlow(TaskExecutor taskExecutor) {
    return f -> f
            .scatterGather(
                    scatterer -> scatterer
                            .recipientFlow(f1 -> f1.transform(p -> "Sub-flow#1"))
                            .recipientFlow(f2 -> f2
                                    .channel(c -> c.executor(taskExecutor))
                                    .transform(p -> {
                                        throw new RuntimeException("Sub-flow#2");
                                    })),
                    null,
                    s -> s.errorChannel("scatterGatherErrorChannel"));
}

@ServiceActivator(inputChannel = "scatterGatherErrorChannel")
public Message<?> processAsyncScatterError(MessagingException payload) {
    return MessageBuilder.withPayload(payload.getCause().getCause())
            .copyHeaders(payload.getFailedMessage().getHeaders())
            .build();
}

为了生成正确的回复,我们必须复制 headers(包括replyChannelerrorChannel) 从failedMessageMessagingException已发送到scatterGatherErrorChannelMessagePublishingErrorHandler. 这样,目标异常将返回给ScatterGatherHandler用于回复消息组完成。 这样的异常payload可以在MessageGroupProcessor的 Gatherer 或其他方式处理的下游,在 scatter-gather 端点之后。spring-doc.cadn.net.cn

在将分散结果发送给 Gatherer 之前,ScatterGatherHandler恢复请求消息标头,包括 reply 和 error 通道(如果有)。 这样,来自AggregatingMessageHandler将传播给调用方,即使在 Scatter 接收者子流中应用了异步切换也是如此。 为了成功作,需要gatherResultChannel,originalReplyChanneloriginalErrorChannel标头必须传输回 Scatter 收件人子流的回复。 在这种情况下,一个合理的、有限的gatherTimeout必须为ScatterGatherHandler. 否则,默认情况下,它将永远被阻止,等待 Gatherer 的回复。