AMQP 支持的消息通道
有两种消息通道实现可用。
一个是点对点的,另一个是发布-订阅。
这两个通道都为底层AmqpTemplate
和SimpleMessageListenerContainer
(如本章前面所示的通道适配器和网关)。
但是,我们在此处显示的示例具有最少的配置。
浏览 XML 架构以查看可用属性。
点对点通道可能类似于以下示例:
<int-amqp:channel id="p2pChannel"/>
在后台,前面的示例会导致Queue
叫si.p2pChannel
要声明的 URL,并且此通道会向该Queue
(从技术上讲,通过发送到 no-name 直接交换,并使用与此 name 匹配的 nameQueue
).
此通道还会在该Queue
.
如果您希望通道是 “pollable” 而不是消息驱动的,请提供message-driven
值为false
,如下例所示:
<int-amqp:channel id="p2pPollableChannel" message-driven="false"/>
发布-订阅通道可能如下所示:
<int-amqp:publish-subscribe-channel id="pubSubChannel"/>
在后台,前面的示例会导致一个名为si.fanout.pubSubChannel
要声明,并且此通道将发送到该 fanout 交换。
此通道还声明一个名为 exclusive、auto-delete、non-durable 的服务器Queue
并将其绑定到 fanout 交换,同时在该 Consumer上注册 ConsumerQueue
以接收消息。
publish-subscribe-channel 没有 “pollable” 选项。
它必须是消息驱动的。
从版本 4.1 开始,AMQP 支持的消息通道(与channel-transacted
) 支持template-channel-transacted
分开transactional
Configuration 的AbstractMessageListenerContainer
和
对于RabbitTemplate
.
请注意,在之前的channel-transacted
是true
默认情况下。
现在,默认情况下,它是false
对于AbstractMessageListenerContainer
.
在版本 4.3 之前,AMQP 支持的通道仅支持Serializable
payloads 和 headers 的 Headers 中。
整个消息被转换(序列化)并发送到 RabbitMQ。
现在,您可以设置extract-payload
属性(或setExtractPayload()
when using Java configuration) 设置为true
.
当此标志为true
,则消息有效负荷将被转换,并且 Headers 将被映射,其方式类似于使用通道适配器时。
这种安排允许 AMQP 支持的通道与不可序列化的有效负载(可能与其他消息转换器一起使用,例如Jackson2JsonMessageConverter
).
有关默认映射 Headers 的更多信息,请参阅 AMQP Message Headers。
您可以通过提供使用outbound-header-mapper
和inbound-header-mapper
属性。
现在,您还可以指定default-delivery-mode
,用于设置没有amqp_deliveryMode
页眉。
默认情况下, Spring AMQPMessageProperties
使用PERSISTENT
交付模式。
与其他支持持久性的通道一样,AMQP 支持的通道旨在提供消息持久性以避免消息丢失。 它们不用于将工作分发给其他对等应用程序。 为此,请改用通道适配器。 |
从版本 5.0 开始,pollable 通道现在会阻塞指定receiveTimeout (默认值为 1 秒)。
以前,与其他PollableChannel 实现中,如果没有可用的消息,则线程会立即返回到调度程序,而不管接收超时如何。
阻止比使用basicGet() 检索消息(无超时),因为必须创建一个使用者来接收每条消息。
要恢复之前的行为,请将 Poller 的receiveTimeout 设置为 0。 |
使用 Java 配置进行配置
以下示例显示如何使用 Java 配置配置通道:
@Bean
public AmqpChannelFactoryBean pollable(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean();
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("foo");
factoryBean.setPubSub(false);
return factoryBean;
}
@Bean
public AmqpChannelFactoryBean messageDriven(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("bar");
factoryBean.setPubSub(false);
return factoryBean;
}
@Bean
public AmqpChannelFactoryBean pubSub(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("baz");
factoryBean.setPubSub(false);
return factoryBean;
}
使用 Java DSL 进行配置
以下示例显示如何使用 Java DSL 配置通道:
@Bean
public IntegrationFlow pollableInFlow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(...)
...
.channel(Amqp.pollableChannel(connectionFactory)
.queueName("foo"))
...
.get();
}
@Bean
public IntegrationFlow messageDrivenInFow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(...)
...
.channel(Amqp.channel(connectionFactory)
.queueName("bar"))
...
.get();
}
@Bean
public IntegrationFlow pubSubInFlow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(...)
...
.channel(Amqp.publishSubscribeChannel(connectionFactory)
.queueName("baz"))
...
.get();
}