Message Channels

In addition to the IntegrationFlowBuilder with EIP methods, the Java DSL provides a fluent API to configure MessageChannel instances. For this purpose the MessageChannels builder factory is provided. The following example shows how to use it:spring-doc.cn

@Bean
public PriorityChannelSpec priorityChannel() {
    return MessageChannels.priority(this.mongoDbChannelMessageStore, "priorityGroup")
                        .interceptor(wireTap());
}

The same MessageChannels builder factory can be used in the channel() EIP method from IntegrationFlowBuilder to wire endpoints, similar to wiring an input-channel/output-channel pair in the XML configuration. By default, endpoints are wired with DirectChannel instances where the bean name is based on the following pattern: [IntegrationFlow.beanName].channel#[channelNameIndex]. This rule is also applied for unnamed channels produced by inline MessageChannels builder factory usage. However, all MessageChannels methods have a variant that is aware of the channelId that you can use to set the bean names for MessageChannel instances. The MessageChannel references and beanName can be used as bean-method invocations. The following example shows the possible ways to use the channel() EIP method:spring-doc.cn

@Bean
public QueueChannelSpec queueChannel() {
    return MessageChannels.queue();
}

@Bean
public PublishSubscribeChannelSpec<?> publishSubscribe() {
    return MessageChannels.publishSubscribe();
}

@Bean
public IntegrationFlow channelFlow() {
    return IntegrationFlow.from("input")
                .fixedSubscriberChannel()
                .channel("queueChannel")
                .channel(publishSubscribe())
                .channel(MessageChannels.executor("executorChannel", this.taskExecutor))
                .channel("output")
                .get();
}
  • from("input") means "'find and use the MessageChannel with the "input" id, or create one'".spring-doc.cn

  • fixedSubscriberChannel() produces an instance of FixedSubscriberChannel and registers it with a name of channelFlow.channel#0.spring-doc.cn

  • channel("queueChannel") works the same way but uses an existing queueChannel bean.spring-doc.cn

  • channel(publishSubscribe()) is the bean-method reference.spring-doc.cn

  • channel(MessageChannels.executor("executorChannel", this.taskExecutor)) is the IntegrationFlowBuilder that exposes IntegrationComponentSpec to the ExecutorChannel and registers it as executorChannel.spring-doc.cn

  • channel("output") registers the DirectChannel bean with output as its name, as long as no beans with this name already exist.spring-doc.cn

Note: The preceding IntegrationFlow definition is valid, and all of its channels are applied to endpoints with BridgeHandler instances.spring-doc.cn

Be careful to use the same inline channel definition through MessageChannels factory from different IntegrationFlow instances. Even if the DSL parser registers non-existent objects as beans, it cannot determine the same object (MessageChannel) from different IntegrationFlow containers. The following example is wrong:
@Bean
public IntegrationFlow startFlow() {
    return IntegrationFlow.from("input")
                .transform(...)
                .channel(MessageChannels.queue("queueChannel"))
                .get();
}

@Bean
public IntegrationFlow endFlow() {
    return IntegrationFlow.from(MessageChannels.queue("queueChannel"))
                .handle(...)
                .get();
}

The result of that bad example is the following exception:spring-doc.cn

Caused by: java.lang.IllegalStateException:
Could not register object [queueChannel] under bean name 'queueChannel':
     there is already object [queueChannel] bound
	    at o.s.b.f.s.DefaultSingletonBeanRegistry.registerSingleton(DefaultSingletonBeanRegistry.java:129)

To make it work, you need to declare @Bean for that channel and use its bean method from different IntegrationFlow instances.spring-doc.cn