In addition to the IntegrationFlowBuilder
with EIP methods, the Java DSL provides a fluent API to configure MessageChannel
instances.
For this purpose the MessageChannels
builder factory is provided.
The following example shows how to use it:
@Bean
public PriorityChannelSpec priorityChannel() {
return MessageChannels.priority(this.mongoDbChannelMessageStore, "priorityGroup")
.interceptor(wireTap());
}
The same MessageChannels
builder factory can be used in the channel()
EIP method from IntegrationFlowBuilder
to wire endpoints, similar to wiring an input-channel
/output-channel
pair in the XML configuration.
By default, endpoints are wired with DirectChannel
instances where the bean name is based on the following pattern: [IntegrationFlow.beanName].channel#[channelNameIndex]
.
This rule is also applied for unnamed channels produced by inline MessageChannels
builder factory usage.
However, all MessageChannels
methods have a variant that is aware of the channelId
that you can use to set the bean names for MessageChannel
instances.
The MessageChannel
references and beanName
can be used as bean-method invocations.
The following example shows the possible ways to use the channel()
EIP method:
@Bean
public QueueChannelSpec queueChannel() {
return MessageChannels.queue();
}
@Bean
public PublishSubscribeChannelSpec<?> publishSubscribe() {
return MessageChannels.publishSubscribe();
}
@Bean
public IntegrationFlow channelFlow() {
return IntegrationFlow.from("input")
.fixedSubscriberChannel()
.channel("queueChannel")
.channel(publishSubscribe())
.channel(MessageChannels.executor("executorChannel", this.taskExecutor))
.channel("output")
.get();
}
-
from("input")
means "'find and use theMessageChannel
with the "input" id, or create one'". -
fixedSubscriberChannel()
produces an instance ofFixedSubscriberChannel
and registers it with a name ofchannelFlow.channel#0
. -
channel("queueChannel")
works the same way but uses an existingqueueChannel
bean. -
channel(publishSubscribe())
is the bean-method reference. -
channel(MessageChannels.executor("executorChannel", this.taskExecutor))
is theIntegrationFlowBuilder
that exposesIntegrationComponentSpec
to theExecutorChannel
and registers it asexecutorChannel
. -
channel("output")
registers theDirectChannel
bean withoutput
as its name, as long as no beans with this name already exist.
Note: The preceding IntegrationFlow
definition is valid, and all of its channels are applied to endpoints with BridgeHandler
instances.
Be careful to use the same inline channel definition through MessageChannels factory from different IntegrationFlow instances.
Even if the DSL parser registers non-existent objects as beans, it cannot determine the same object (MessageChannel ) from different IntegrationFlow containers.
The following example is wrong:
|
Be careful to use the same inline channel definition through MessageChannels factory from different IntegrationFlow instances.
Even if the DSL parser registers non-existent objects as beans, it cannot determine the same object (MessageChannel ) from different IntegrationFlow containers.
The following example is wrong:
|
@Bean
public IntegrationFlow startFlow() {
return IntegrationFlow.from("input")
.transform(...)
.channel(MessageChannels.queue("queueChannel"))
.get();
}
@Bean
public IntegrationFlow endFlow() {
return IntegrationFlow.from(MessageChannels.queue("queueChannel"))
.handle(...)
.get();
}
The result of that bad example is the following exception:
Caused by: java.lang.IllegalStateException:
Could not register object [queueChannel] under bean name 'queueChannel':
there is already object [queueChannel] bound
at o.s.b.f.s.DefaultSingletonBeanRegistry.registerSingleton(DefaultSingletonBeanRegistry.java:129)
To make it work, you need to declare @Bean
for that channel and use its bean method from different IntegrationFlow
instances.