Some of if…else
and publish-subscribe
components provide the ability to specify their logic or mapping by using sub-flows.
The simplest sample is .publishSubscribeChannel()
, as the following example shows:
@Bean
public IntegrationFlow subscribersFlow() {
return flow -> flow
.publishSubscribeChannel(Executors.newCachedThreadPool(), s -> s
.subscribe(f -> f
.<Integer>handle((p, h) -> p / 2)
.channel(c -> c.queue("subscriber1Results")))
.subscribe(f -> f
.<Integer>handle((p, h) -> p * 2)
.channel(c -> c.queue("subscriber2Results"))))
.<Integer>handle((p, h) -> p * 3)
.channel(c -> c.queue("subscriber3Results"));
}
You can achieve the same result with separate IntegrationFlow
@Bean
definitions, but we hope you find the sub-flow style of logic composition useful.
We find that it results in shorter (and so more readable) code.
Starting with version 5.3, a BroadcastCapableChannel
-based publishSubscribeChannel()
implementation is provided to configure sub-flow subscribers on broker-backed message channels.
For example, we now can configure several subscribers as sub-flows on the Jms.publishSubscribeChannel()
:
@Bean
public JmsPublishSubscribeMessageChannelSpec jmsPublishSubscribeChannel() {
return Jms.publishSubscribeChannel(jmsConnectionFactory())
.destination("pubsub");
}
@Bean
public IntegrationFlow pubSubFlow(BroadcastCapableChannel jmsPublishSubscribeChannel) {
return f -> f
.publishSubscribeChannel(jmsPublishSubscribeChannel,
pubsub -> pubsub
.subscribe(subFlow -> subFlow
.channel(c -> c.queue("jmsPubSubBridgeChannel1")))
.subscribe(subFlow -> subFlow
.channel(c -> c.queue("jmsPubSubBridgeChannel2"))));
}
A similar publish-subscribe
sub-flow composition provides the .routeToRecipients()
method.
Another example is using .discardFlow()
instead of .discardChannel()
on the .filter()
method.
The .route()
deserves special attention.
Consider the following example:
@Bean
public IntegrationFlow routeFlow() {
return f -> f
.<Integer, Boolean>route(p -> p % 2 == 0,
m -> m.channelMapping("true", "evenChannel")
.subFlowMapping("false", sf ->
sf.<Integer>handle((p, h) -> p * 3)))
.transform(Object::toString)
.channel(c -> c.queue("oddChannel"));
}
The .channelMapping()
continues to work as it does in regular Router
mapping, but the .subFlowMapping()
tied that sub-flow to the main flow.
In other words, any router’s sub-flow returns to the main flow after .route()
.
Sometimes, you need to refer to an existing
Caused by: org.springframework.beans.factory.BeanCreationException: The 'currentComponent' (org.springframework.integration.router.MethodInvokingRouter@7965a51c) is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'. This is the end of the integration flow. When you configure a sub-flow as a lambda, the framework handles the request-reply interaction with the sub-flow and a gateway is not needed. |
Sometimes, you need to refer to an existing
Caused by: org.springframework.beans.factory.BeanCreationException: The 'currentComponent' (org.springframework.integration.router.MethodInvokingRouter@7965a51c) is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'. This is the end of the integration flow. When you configure a sub-flow as a lambda, the framework handles the request-reply interaction with the sub-flow and a gateway is not needed. |
Sub-flows can be nested to any depth, but we do not recommend doing so. In fact, even in the router case, adding complex sub-flows within a flow would quickly begin to look like a plate of spaghetti and be difficult for a human to parse.
In cases where the DSL supports a subflow configuration, when a channel is normally needed for the component being configured, and that subflow starts with a
the Framework internally creates a |
In cases where the DSL supports a subflow configuration, when a channel is normally needed for the component being configured, and that subflow starts with a
the Framework internally creates a |