此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.3.1Spring中文文档

此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.3.1Spring中文文档

从版本 4.1 开始,Spring Integration 提供了分散-聚集企业集成模式的实现。 它是一个复合终结点,其目标是向收件人发送消息并聚合结果。 如企业集成模式中所述,它是“最佳报价”等场景的组件,我们需要从多个供应商处请求信息,并决定哪一个供应商为我们提供所请求项目的最佳术语。Spring中文文档

以前,可以使用分立元件来配置模式。 此增强功能带来了更方便的配置。Spring中文文档

是组合了 (或 ) 和 . 请求消息被发送到通道,并等待聚合器发送到 .ScatterGatherHandlerPublishSubscribeChannelRecipientListRouterAggregatingMessageHandlerscatterScatterGatherHandleroutputChannelSpring中文文档

功能性

该模式建议两种情况:“拍卖”和“分配”。 在这两种情况下,函数是相同的,并提供了 . (实际上,只需要一个作为构造函数参数。 有关详细信息,请参阅聚合器Scatter-GatheraggregationAggregatingMessageHandlerScatterGatherHandlerAggregatingMessageHandlerSpring中文文档

拍卖

拍卖变体对请求消息使用“发布-订阅”逻辑,其中“分散”通道是带有 . 但是,此通道可以是任何实现(如 in 的情况 — 请参阅内容丰富器)。 但是,在这种情况下,您应该为函数创建自己的自定义。Scatter-GatherPublishSubscribeChannelapply-sequence="true"MessageChannelrequest-channelContentEnrichercorrelationStrategyaggregationSpring中文文档

分配

分发变体基于 (请参阅 RecipientListRouter) 以及 . 这是第二个构造函数参数。 如果只想依赖 和 的默认值,则应指定 。 否则,应提供 . 与变体(拍卖变体)不同,有一个选项可以根据消息过滤目标供应商。 使用 ,提供默认值,并且可以正确释放组。 分配选项与拍卖选项互斥。Scatter-GatherRecipientListRouterRecipientListRouterScatterGatherHandlercorrelationStrategyrecipient-list-routeraggregatorapply-sequence="true"correlationStrategyaggregatorPublishSubscribeChannelrecipient-list-routerselectorapply-sequence="true"sequenceSizeaggregatorSpring中文文档

只有基于构造函数配置的纯 Java 配置才需要它,因为框架无法改变外部提供的组件。 为方便起见,XML 和 Java DSL for 从 V6.0 开始设置为 true。applySequence=trueScatterGatherHandler(MessageHandler scatterer, MessageHandler gatherer)Scatter-GatherapplySequence

对于拍卖和分发变体,请求(分散)消息都使用标头进行扩充,以等待来自 的回复消息。gatherResultChannelaggregatorSpring中文文档

默认情况下,所有供应商都应将其结果发送到标头(通常通过省略最终端点)。 但是,还提供了该选项,允许供应商将他们的回复发送到该渠道进行聚合。replyChanneloutput-channelgatherChannelSpring中文文档

只有基于构造函数配置的纯 Java 配置才需要它,因为框架无法改变外部提供的组件。 为方便起见,XML 和 Java DSL for 从 V6.0 开始设置为 true。applySequence=trueScatterGatherHandler(MessageHandler scatterer, MessageHandler gatherer)Scatter-GatherapplySequence

配置 Scatter-Gather 端点

以下示例显示了 Bean 定义的 Java 配置:Scatter-GatherSpring中文文档

@Bean
public MessageHandler distributor() {
    RecipientListRouter router = new RecipientListRouter();
    router.setApplySequence(true);
    router.setChannels(Arrays.asList(distributionChannel1(), distributionChannel2(),
            distributionChannel3()));
    return router;
}

@Bean
public MessageHandler gatherer() {
	return new AggregatingMessageHandler(
			new ExpressionEvaluatingMessageGroupProcessor("^[payload gt 5] ?: -1D"),
			new SimpleMessageStore(),
			new HeaderAttributeCorrelationStrategy(
			       IntegrationMessageHeaderAccessor.CORRELATION_ID),
			new ExpressionEvaluatingReleaseStrategy("size() == 2"));
}

@Bean
@ServiceActivator(inputChannel = "distributionChannel")
public MessageHandler scatterGatherDistribution() {
	ScatterGatherHandler handler = new ScatterGatherHandler(distributor(), gatherer());
	handler.setOutputChannel(output());
	return handler;
}

在前面的示例中,我们使用 Bean 和收件人通道列表进行配置。 下一个 Bean 是 . 最后,我们将这两个 Bean 都注入到 Bean 定义中,并将其标记为将分散-聚集组件连接到集成流中。RecipientListRouterdistributorapplySequence="true"AggregatingMessageHandlerScatterGatherHandler@ServiceActivatorSpring中文文档

下面的示例演示如何使用 XML 命名空间配置终结点:<scatter-gather>Spring中文文档

<scatter-gather
		id=""  (1)
		auto-startup=""  (2)
		input-channel=""  (3)
		output-channel=""  (4)
		scatter-channel=""  (5)
		gather-channel=""  (6)
		order=""  (7)
		phase=""  (8)
		send-timeout=""  (9)
		gather-timeout=""  (10)
		requires-reply="" > (11)
			<scatterer/>  (12)
			<gatherer/>  (13)
</scatter-gather>
1 终结点的 ID。 Bean 的注册别名为 。 Bean 的注册别名为 。 Bean 的注册别名为 。 自选。 (将生成默认值。ScatterGatherHandlerid + '.handler'RecipientListRouterid + '.scatterer'AggregatingMessageHandlerid + '.gatherer'BeanFactoryid
2 生命周期属性指示是否应在应用程序上下文初始化期间启动终结点。 此外,还实现并启动和停止 ,如果提供了 ,则在内部创建。 自选。 (默认值为 .)ScatterGatherHandlerLifecyclegatherEndpointgather-channeltrue
3 用于接收请求消息以在 . 必填。ScatterGatherHandler
4 将聚合结果发送到的通道。 自选。 (传入消息可以在消息标头中指定回复通道)。ScatterGatherHandlerreplyChannel
5 要向拍卖方案发送分散消息的通道。 自选。 与子元素互斥。<scatterer>
6 从聚合的每个供应商处接收回复的渠道。 它用作分散消息中的标头。 自选。 默认情况下,将创建 。replyChannelFixedSubscriberChannel
7 当多个处理程序订阅同一个组件时,此组件的顺序(用于负载平衡目的)。 自选。DirectChannel
8 指定应启动和停止终结点的阶段。 启动顺序从最低到最高,关闭顺序从最高到最低。 默认情况下,此值为 ,表示此容器尽可能晚启动并尽快停止。 自选。Integer.MAX_VALUE
9 向 . 默认情况下,块为一秒钟。 仅当输出通道具有某些“发送”限制时,它才适用 - 例如,具有固定“容量”的已满。 在本例中,将抛出 a。 对于实现,将忽略 。 对于 ,来自已计划的过期任务会导致重新计划此任务。 自选。Messageoutput-channelsend()QueueChannelMessageDeliveryExceptionsend-timeoutAbstractSubscribableChannelgroup-timeout(-expression)MessageDeliveryException
10 允许您指定 scatter-gather 在返回之前等待回复消息的时间。 默认情况下,它会等待几秒钟。 如果回复超时,则返回“null”。 自选。30
11 指定 scatter-gather 是否必须返回非 null 值。 默认情况下,此值为值。 因此,当基础聚合器在 之后返回 null 值时,将抛出 a。 请注意,如果可能,应指定 以避免无限期等待。trueReplyRequiredExceptiongather-timeoutnullgather-timeout
12 选项。 自选。 与属性互斥。<recipient-list-router>scatter-channel
13 选项。 必填。<aggregator>
1 终结点的 ID。 Bean 的注册别名为 。 Bean 的注册别名为 。 Bean 的注册别名为 。 自选。 (将生成默认值。ScatterGatherHandlerid + '.handler'RecipientListRouterid + '.scatterer'AggregatingMessageHandlerid + '.gatherer'BeanFactoryid
2 生命周期属性指示是否应在应用程序上下文初始化期间启动终结点。 此外,还实现并启动和停止 ,如果提供了 ,则在内部创建。 自选。 (默认值为 .)ScatterGatherHandlerLifecyclegatherEndpointgather-channeltrue
3 用于接收请求消息以在 . 必填。ScatterGatherHandler
4 将聚合结果发送到的通道。 自选。 (传入消息可以在消息标头中指定回复通道)。ScatterGatherHandlerreplyChannel
5 要向拍卖方案发送分散消息的通道。 自选。 与子元素互斥。<scatterer>
6 从聚合的每个供应商处接收回复的渠道。 它用作分散消息中的标头。 自选。 默认情况下,将创建 。replyChannelFixedSubscriberChannel
7 当多个处理程序订阅同一个组件时,此组件的顺序(用于负载平衡目的)。 自选。DirectChannel
8 指定应启动和停止终结点的阶段。 启动顺序从最低到最高,关闭顺序从最高到最低。 默认情况下,此值为 ,表示此容器尽可能晚启动并尽快停止。 自选。Integer.MAX_VALUE
9 向 . 默认情况下,块为一秒钟。 仅当输出通道具有某些“发送”限制时,它才适用 - 例如,具有固定“容量”的已满。 在本例中,将抛出 a。 对于实现,将忽略 。 对于 ,来自已计划的过期任务会导致重新计划此任务。 自选。Messageoutput-channelsend()QueueChannelMessageDeliveryExceptionsend-timeoutAbstractSubscribableChannelgroup-timeout(-expression)MessageDeliveryException
10 允许您指定 scatter-gather 在返回之前等待回复消息的时间。 默认情况下,它会等待几秒钟。 如果回复超时,则返回“null”。 自选。30
11 指定 scatter-gather 是否必须返回非 null 值。 默认情况下,此值为值。 因此,当基础聚合器在 之后返回 null 值时,将抛出 a。 请注意,如果可能,应指定 以避免无限期等待。trueReplyRequiredExceptiongather-timeoutnullgather-timeout
12 选项。 自选。 与属性互斥。<recipient-list-router>scatter-channel
13 选项。 必填。<aggregator>

错误处理

由于 Scatter-Gather 是一个多请求-应答组件,因此错误处理具有一些额外的复杂性。 在某些情况下,如果允许进程以比请求更少的回复完成,则最好只捕获并忽略下游异常。 在其他情况下,当发生错误时,应考虑从子流返回“补偿消息”之类的内容。ReleaseStrategySpring中文文档

每个异步子流都应配置一个标头,以便从 . 否则,将使用通用错误处理逻辑将错误发送到全局。 有关异步错误处理的详细信息,请参阅错误处理errorChannelMessagePublishingErrorHandlererrorChannelSpring中文文档

同步流可以使用 an 来忽略异常或返回补偿消息。 当异常从其中一个子流抛出到 时,它只是重新抛出到上游。 这样一来,所有其他子流都将毫无用处,并且它们的回复将在 . 这有时可能是预期的行为,但在大多数情况下,最好在不影响所有其他子流和收集器中的期望的情况下处理特定子流中的错误。ExpressionEvaluatingRequestHandlerAdviceScatterGatherHandlerScatterGatherHandlerSpring中文文档

从版本 5.1.3 开始,将提供该选项。 它被填充到分散消息的标头中,并在发生异步错误时使用,或者可以在常规同步子流中用于直接发送错误消息。ScatterGatherHandlererrorChannelNameerrorChannelSpring中文文档

下面的示例配置通过返回补偿消息来演示异步错误处理:Spring中文文档

@Bean
public IntegrationFlow scatterGatherAndExecutorChannelSubFlow(TaskExecutor taskExecutor) {
    return f -> f
            .scatterGather(
                    scatterer -> scatterer
                            .recipientFlow(f1 -> f1.transform(p -> "Sub-flow#1"))
                            .recipientFlow(f2 -> f2
                                    .channel(c -> c.executor(taskExecutor))
                                    .transform(p -> {
                                        throw new RuntimeException("Sub-flow#2");
                                    })),
                    null,
                    s -> s.errorChannel("scatterGatherErrorChannel"));
}

@ServiceActivator(inputChannel = "scatterGatherErrorChannel")
public Message<?> processAsyncScatterError(MessagingException payload) {
    return MessageBuilder.withPayload(payload.getCause().getCause())
            .copyHeaders(payload.getFailedMessage().getHeaders())
            .build();
}

为了生成正确的回复,我们必须从已发送到 的 中复制标头(包括 和 )。 这样,目标异常将返回给 for reply messages 组完成的收集器。 可以在收集器中筛选出此类异常,也可以在分散收集端点之后以其他方式向下游进行处理。replyChannelerrorChannelfailedMessageMessagingExceptionscatterGatherErrorChannelMessagePublishingErrorHandlerScatterGatherHandlerpayloadMessageGroupProcessorSpring中文文档

在将分散结果发送到收集器之前,请恢复请求消息标头,包括应答和错误通道(如果有)。 这样,来自 的错误将传播到调用方,即使在分散接收者子流中应用了异步切换也是如此。 为了成功操作,必须将 和 标头传输回分散接收方子流的回复。 在这种情况下,必须为 . 否则,默认情况下,它将被阻止,等待收集器的回复。ScatterGatherHandlerAggregatingMessageHandlergatherResultChanneloriginalReplyChanneloriginalErrorChannelgatherTimeoutScatterGatherHandler
在将分散结果发送到收集器之前,请恢复请求消息标头,包括应答和错误通道(如果有)。 这样,来自 的错误将传播到调用方,即使在分散接收者子流中应用了异步切换也是如此。 为了成功操作,必须将 和 标头传输回分散接收方子流的回复。 在这种情况下,必须为 . 否则,默认情况下,它将被阻止,等待收集器的回复。ScatterGatherHandlerAggregatingMessageHandlergatherResultChanneloriginalReplyChanneloriginalErrorChannelgatherTimeoutScatterGatherHandler