对于最新的稳定版本,请使用 Spring Integration 6.4.0spring-doc.cn

分散-聚集

从版本 4.1 开始, Spring 集成提供了分散-聚集企业集成模式的实现。 它是一个复合终端节点,其目标是向收件人发送消息并聚合结果。 正如 Enterprise Integration Patterns 中所指出的,它是 “best quote” 等场景的一个组件,在这种情况下,我们需要从多个供应商那里请求信息,并决定哪一个为我们提供所请求项目的最佳术语。spring-doc.cn

以前,可以使用离散元件来配置模式。 此次优化带来了更便捷的配置。spring-doc.cn

它是一个请求-回复终端节点,它结合了 a(或 a )和 . 请求消息将发送到通道,并等待聚合器发送到 的回复。ScatterGatherHandlerPublishSubscribeChannelRecipientListRouterAggregatingMessageHandlerscatterScatterGatherHandleroutputChannelspring-doc.cn

功能性

该模式建议两种情况:“auction”和“distribution”。 在这两种情况下,函数都是相同的,并且提供了 . (实际上,它只需要一个 as 构造函数参数。 有关更多信息,请参阅 AggregatorScatter-GatheraggregationAggregatingMessageHandlerScatterGatherHandlerAggregatingMessageHandlerspring-doc.cn

拍卖

拍卖变体对请求消息使用“发布-订阅”逻辑,其中“分散”通道是 with 。 但是,此通道可以是任何实现(就像 中的一样 — 请参阅 Content Enricher)。 但是,在这种情况下,您应该为函数创建自己的自定义。Scatter-GatherPublishSubscribeChannelapply-sequence="true"MessageChannelrequest-channelContentEnrichercorrelationStrategyaggregationspring-doc.cn

分配

分发变体基于 (请参阅 RecipientListRouter) 以及 的所有可用选项 。 这是第二个 constructor 参数。 如果只想依赖 和 的默认值,则应指定 。 否则,您应该为 . 与变体 (auction 变体) 不同,拥有一个选项可让 Target Suppliers 根据消息进行筛选。 使用 ,提供默认值,并且可以正确释放组。 distribution 选项与 auction 选项互斥。Scatter-GatherRecipientListRouterRecipientListRouterScatterGatherHandlercorrelationStrategyrecipient-list-routeraggregatorapply-sequence="true"correlationStrategyaggregatorPublishSubscribeChannelrecipient-list-routerselectorapply-sequence="true"sequenceSizeaggregatorspring-doc.cn

只有基于构造函数配置的普通 Java 配置才需要 this,因为框架无法改变外部提供的组件。 为方便起见,从版本 6.0 开始,XML 和 Java DSL for 设置为 true。applySequence=trueScatterGatherHandler(MessageHandler scatterer, MessageHandler gatherer)Scatter-GatherapplySequence

对于拍卖和分发变体,请求 (scatter) 消息都使用标头进行扩充,以等待来自的回复消息。gatherResultChannelaggregatorspring-doc.cn

默认情况下,所有供应商都应该将其结果发送到 header(通常通过省略 from the ultimate endpoint)。 但是,还提供了该选项,允许供应商将他们的回复发送到该通道以进行聚合。replyChanneloutput-channelgatherChannelspring-doc.cn

配置 Scatter-Gather 端点

以下示例显示了 的 bean 定义的 Java 配置:Scatter-Gatherspring-doc.cn

@Bean
public MessageHandler distributor() {
    RecipientListRouter router = new RecipientListRouter();
    router.setApplySequence(true);
    router.setChannels(Arrays.asList(distributionChannel1(), distributionChannel2(),
            distributionChannel3()));
    return router;
}

@Bean
public MessageHandler gatherer() {
	return new AggregatingMessageHandler(
			new ExpressionEvaluatingMessageGroupProcessor("^[payload gt 5] ?: -1D"),
			new SimpleMessageStore(),
			new HeaderAttributeCorrelationStrategy(
			       IntegrationMessageHeaderAccessor.CORRELATION_ID),
			new ExpressionEvaluatingReleaseStrategy("size() == 2"));
}

@Bean
@ServiceActivator(inputChannel = "distributionChannel")
public MessageHandler scatterGatherDistribution() {
	ScatterGatherHandler handler = new ScatterGatherHandler(distributor(), gatherer());
	handler.setOutputChannel(output());
	return handler;
}

在前面的示例中,我们使用 Bean 和收件人通道列表进行配置。 下一个 bean 用于 . 最后,我们将这两个 bean 注入到 bean 定义中,并将其标记为 a 以将 scatter-gather 组件连接到集成流中。RecipientListRouterdistributorapplySequence="true"AggregatingMessageHandlerScatterGatherHandler@ServiceActivatorspring-doc.cn

以下示例演示如何使用 XML 命名空间配置终端节点:<scatter-gather>spring-doc.cn

<scatter-gather
		id=""  (1)
		auto-startup=""  (2)
		input-channel=""  (3)
		output-channel=""  (4)
		scatter-channel=""  (5)
		gather-channel=""  (6)
		order=""  (7)
		phase=""  (8)
		send-timeout=""  (9)
		gather-timeout=""  (10)
		requires-reply="" > (11)
			<scatterer/>  (12)
			<gatherer/>  (13)
</scatter-gather>
1 终端节点的 ID。 该 Bean 使用别名 . 该 Bean 使用别名 . 该 Bean 使用别名 . 自选。 (这将生成一个默认值。ScatterGatherHandlerid + '.handler'RecipientListRouterid + '.scatterer'AggregatingMessageHandlerid + '.gatherer'BeanFactoryid
2 生命周期属性指示是否应在应用程序上下文初始化期间启动终端节点。 此外, 还 implements 和 starts and stops ,如果提供了 a ,则会在内部创建。 自选。 (默认值为 .)ScatterGatherHandlerLifecyclegatherEndpointgather-channeltrue
3 接收请求消息的通道,以便在 . 必填。ScatterGatherHandler
4 将聚合结果发送到的通道。 自选。 (传入邮件可以在邮件头中自行指定回复通道)。ScatterGatherHandlerreplyChannel
5 将拍卖场景的分散消息发送到的通道。 自选。 与 sub-element 互斥。<scatterer>
6 接收来自每个供应商的聚合回复的通道。 它用作散点消息中的标头。 自选。 默认情况下,已创建 。replyChannelFixedSubscriberChannel
7 当多个处理程序订阅同一组件时,此组件的顺序(用于负载平衡目的)。 自选。DirectChannel
8 指定终端节点的启动和停止阶段。 启动顺序从最低到最高,关闭顺序从最高到最低。 默认情况下,此值为 ,表示此容器尽可能晚地启动并尽快停止。 自选。Integer.MAX_VALUE
9 向 . 默认情况下,块为 1 秒。 仅当 output channel 有一些 'sending' 限制时才适用 - 例如,具有固定 'capacity' 的 a 已满。 在这种情况下,会引发 a。 对于实现,将忽略 。 对于 ,从计划过期任务开始,将重新计划此任务。 自选。Messageoutput-channelsend()QueueChannelMessageDeliveryExceptionsend-timeoutAbstractSubscribableChannelgroup-timeout(-expression)MessageDeliveryException
10 用于指定 scatter-gather 在返回之前等待回复消息的时间。 默认情况下,它会等待几秒钟。 如果回复超时,则返回 'null'。 自选。30
11 指定 scatter-gather 是否必须返回非 null 值。 此值为默认值。 因此,当基础聚合器在 之后返回 null 值时,将引发 a 。 请注意,如果有可能,则应指定 the 以避免无限期等待。trueReplyRequiredExceptiongather-timeoutnullgather-timeout
12 选项。 自选。 与 attribute 互斥。<recipient-list-router>scatter-channel
13 选项。 必填。<aggregator>

错误处理

由于 Scatter-Gather 是一个多请求-回复组件,因此错误处理具有一些额外的复杂性。 在某些情况下,如果允许进程以比请求更少的回复完成,则最好只捕获并忽略下游异常。 在其他情况下,当发生错误时,应考虑从子流返回类似 “补偿消息” 的内容。ReleaseStrategyspring-doc.cn

每个异步子流都应配置一个标头,以便从 发送正确的错误消息。 否则,错误将发送到具有常见错误处理逻辑的全局变量。 有关异步错误处理的更多信息,请参阅错误处理errorChannelMessagePublishingErrorHandlererrorChannelspring-doc.cn

同步流可以使用 an 来忽略异常或返回补偿消息。 当异常从其中一个子流引发到 时,它只是被重新引发到上游。 这样,所有其他子流都将毫无用处,并且它们的回复将在 . 这有时可能是预期的行为,但在大多数情况下,最好在不影响所有其他子流和收集器中的期望的情况下处理特定子流中的错误。ExpressionEvaluatingRequestHandlerAdviceScatterGatherHandlerScatterGatherHandlerspring-doc.cn

从版本 5.1.3 开始,提供了 选项。 它填充到分散消息的标头中,并在发生异步错误时使用,或者可以在常规同步子流中使用,以直接发送错误消息。ScatterGatherHandlererrorChannelNameerrorChannelspring-doc.cn

以下示例配置通过返回补偿消息来演示异步错误处理:spring-doc.cn

@Bean
public IntegrationFlow scatterGatherAndExecutorChannelSubFlow(TaskExecutor taskExecutor) {
    return f -> f
            .scatterGather(
                    scatterer -> scatterer
                            .recipientFlow(f1 -> f1.transform(p -> "Sub-flow#1"))
                            .recipientFlow(f2 -> f2
                                    .channel(c -> c.executor(taskExecutor))
                                    .transform(p -> {
                                        throw new RuntimeException("Sub-flow#2");
                                    })),
                    null,
                    s -> s.errorChannel("scatterGatherErrorChannel"));
}

@ServiceActivator(inputChannel = "scatterGatherErrorChannel")
public Message<?> processAsyncScatterError(MessagingException payload) {
    return MessageBuilder.withPayload(payload.getCause().getCause())
            .copyHeaders(payload.getFailedMessage().getHeaders())
            .build();
}

为了生成正确的回复,我们必须从已发送到 的 中复制标头(包括 和 )。 这样,目标异常将返回给 for reply messages 组完成的收集器。 这样的异常可以在 Gatherer 中过滤掉,或者在 scatter-gather 端点之后以其他方式在下游进行处理。replyChannelerrorChannelfailedMessageMessagingExceptionscatterGatherErrorChannelMessagePublishingErrorHandlerScatterGatherHandlerpayloadMessageGroupProcessorspring-doc.cn

在将分散结果发送到收集器之前,恢复请求消息标头,包括回复和错误通道(如果有)。 这样,来自的错误将传播到调用方,即使在分散收件人子流中应用了异步切换也是如此。 要成功操作,必须将 、 和 标头传输回分散收件人子流中的回复。 在这种情况下,必须为 配置一个合理的 finite 。 否则,默认情况下,它将永远被阻止,等待 Gatherer 的回复。ScatterGatherHandlerAggregatingMessageHandlergatherResultChanneloriginalReplyChanneloriginalErrorChannelgatherTimeoutScatterGatherHandler