此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.3.1Spring中文文档

此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.3.1Spring中文文档

某些组件提供了使用子流指定其逻辑或映射的功能。 最简单的示例是 ,如以下示例所示:if…​elsepublish-subscribe.publishSubscribeChannel()Spring中文文档

@Bean
public IntegrationFlow subscribersFlow() {
    return flow -> flow
            .publishSubscribeChannel(Executors.newCachedThreadPool(), s -> s
                    .subscribe(f -> f
                            .<Integer>handle((p, h) -> p / 2)
                            .channel(c -> c.queue("subscriber1Results")))
                    .subscribe(f -> f
                            .<Integer>handle((p, h) -> p * 2)
                            .channel(c -> c.queue("subscriber2Results"))))
            .<Integer>handle((p, h) -> p * 3)
            .channel(c -> c.queue("subscriber3Results"));
}

您可以使用单独的定义获得相同的结果,但我们希望您发现逻辑组合的子流样式有用。 我们发现它会导致更短(因此更具可读性)的代码。IntegrationFlow@BeanSpring中文文档

从版本 5.3 开始,提供了一个基于 的实现,用于在代理支持的消息通道上配置子流订阅者。 例如,我们现在可以将多个订阅者配置为子流:BroadcastCapableChannelpublishSubscribeChannel()Jms.publishSubscribeChannel()Spring中文文档

@Bean
public JmsPublishSubscribeMessageChannelSpec jmsPublishSubscribeChannel() {
    return Jms.publishSubscribeChannel(jmsConnectionFactory())
                .destination("pubsub");
}

@Bean
public IntegrationFlow pubSubFlow(BroadcastCapableChannel jmsPublishSubscribeChannel) {
    return f -> f
            .publishSubscribeChannel(jmsPublishSubscribeChannel,
                    pubsub -> pubsub
                            .subscribe(subFlow -> subFlow
                                .channel(c -> c.queue("jmsPubSubBridgeChannel1")))
                            .subscribe(subFlow -> subFlow
                                .channel(c -> c.queue("jmsPubSubBridgeChannel2"))));
}

类似的子流组合提供了该方法。publish-subscribe.routeToRecipients()Spring中文文档

另一个示例是 using 而不是 on 方法。.discardFlow().discardChannel().filter()Spring中文文档

值得特别关注。 请看以下示例:.route()Spring中文文档

@Bean
public IntegrationFlow routeFlow() {
    return f -> f
            .<Integer, Boolean>route(p -> p % 2 == 0,
                    m -> m.channelMapping("true", "evenChannel")
                            .subFlowMapping("false", sf ->
                                    sf.<Integer>handle((p, h) -> p * 3)))
            .transform(Object::toString)
            .channel(c -> c.queue("oddChannel"));
}

它继续像在常规映射中一样工作,但将该子流与主流联系在一起。 换言之,任何路由器的子流都会在 之后返回到主流。.channelMapping()Router.subFlowMapping().route()Spring中文文档

有时,您需要从 . 以下示例演示如何执行此操作:IntegrationFlow@Bean.subFlowMapping()Spring中文文档

@Bean
public IntegrationFlow splitRouteAggregate() {
    return f -> f
            .split()
            .<Integer, Boolean>route(o -> o % 2 == 0,
                    m -> m
                            .subFlowMapping(true, oddFlow())
                            .subFlowMapping(false, sf -> sf.gateway(evenFlow())))
            .aggregate();
}

@Bean
public IntegrationFlow oddFlow() {
    return f -> f.handle(m -> System.out.println("odd"));
}

@Bean
public IntegrationFlow evenFlow() {
    return f -> f.handle((p, h) -> "even");
}


在这种情况下,当您需要从这样的子流接收应答并继续主流时,必须用 a 包装此 Bean 引用(或其输入通道),如前面的示例所示。 前面示例中的引用未包装到 . 因此,我们不希望此路由分支得到回复。 否则,您最终会得到类似于以下内容的异常:IntegrationFlow.gateway()oddFlow().gateway()Spring中文文档

Caused by: org.springframework.beans.factory.BeanCreationException:
    The 'currentComponent' (org.springframework.integration.router.MethodInvokingRouter@7965a51c)
    is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'.
    This is the end of the integration flow.

当您将子流配置为 lambda 时,框架会处理与子流的请求-回复交互,并且不需要网关。Spring中文文档

有时,您需要从 . 以下示例演示如何执行此操作:IntegrationFlow@Bean.subFlowMapping()Spring中文文档

@Bean
public IntegrationFlow splitRouteAggregate() {
    return f -> f
            .split()
            .<Integer, Boolean>route(o -> o % 2 == 0,
                    m -> m
                            .subFlowMapping(true, oddFlow())
                            .subFlowMapping(false, sf -> sf.gateway(evenFlow())))
            .aggregate();
}

@Bean
public IntegrationFlow oddFlow() {
    return f -> f.handle(m -> System.out.println("odd"));
}

@Bean
public IntegrationFlow evenFlow() {
    return f -> f.handle((p, h) -> "even");
}


在这种情况下,当您需要从这样的子流接收应答并继续主流时,必须用 a 包装此 Bean 引用(或其输入通道),如前面的示例所示。 前面示例中的引用未包装到 . 因此,我们不希望此路由分支得到回复。 否则,您最终会得到类似于以下内容的异常:IntegrationFlow.gateway()oddFlow().gateway()Spring中文文档

Caused by: org.springframework.beans.factory.BeanCreationException:
    The 'currentComponent' (org.springframework.integration.router.MethodInvokingRouter@7965a51c)
    is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'.
    This is the end of the integration flow.

当您将子流配置为 lambda 时,框架会处理与子流的请求-回复交互,并且不需要网关。Spring中文文档

子流可以嵌套到任何深度,但我们不建议这样做。 事实上,即使在路由器的情况下,在流中添加复杂的子流也会很快开始看起来像一盘意大利面,并且很难被人类解析。Spring中文文档

如果 DSL 支持子流配置,则当正在配置的组件通常需要通道,并且该子流以元素开头时,框架会在组件输出通道和流的输入通道之间隐式放置 a。 例如,在此定义中:channel()bridge()filterSpring中文文档

.filter(p -> p instanceof String, e -> e
	.discardFlow(df -> df
                         .channel(MessageChannels.queue())
                         ...)

框架在内部创建一个 bean 用于注入 . 然后,它将子流包装成一个以订阅的这个隐式通道开头,并将 a 放在流中指定的之前。 当现有 Bean 用作子流引用(而不是内联子流,例如 lambda)时,不需要这样的桥接,因为框架可以解析来自流 Bean 的第一个通道。 对于内联子流,输入通道尚不可用。DirectChannelMessageFilter.discardChannelIntegrationFlowbridgechannel()IntegrationFlowSpring中文文档

如果 DSL 支持子流配置,则当正在配置的组件通常需要通道,并且该子流以元素开头时,框架会在组件输出通道和流的输入通道之间隐式放置 a。 例如,在此定义中:channel()bridge()filterSpring中文文档

.filter(p -> p instanceof String, e -> e
	.discardFlow(df -> df
                         .channel(MessageChannels.queue())
                         ...)

框架在内部创建一个 bean 用于注入 . 然后,它将子流包装成一个以订阅的这个隐式通道开头,并将 a 放在流中指定的之前。 当现有 Bean 用作子流引用(而不是内联子流,例如 lambda)时,不需要这样的桥接,因为框架可以解析来自流 Bean 的第一个通道。 对于内联子流,输入通道尚不可用。DirectChannelMessageFilter.discardChannelIntegrationFlowbridgechannel()IntegrationFlowSpring中文文档