AMQP 支持的消息通道
有两种消息通道实现可用。
一个是点对点的,另一个是发布-订阅。
这两个通道都为底层 和 (如本章前面所示的通道适配器和网关) 提供了广泛的配置属性。
但是,我们在此处显示的示例具有最少的配置。
浏览 XML 架构以查看可用属性。AmqpTemplate
SimpleMessageListenerContainer
点对点通道可能类似于以下示例:
<int-amqp:channel id="p2pChannel"/>
在幕后,前面的示例导致声明 named ,并且此通道发送到该 (从技术上讲,通过使用与 this 的名称匹配的路由密钥发送到 no-name 直接交换)。
此通道还会在该 上注册一个 consumer。
如果您希望通道是 “pollable” 而不是消息驱动的,请为标志提供值 ,如下例所示:Queue
si.p2pChannel
Queue
Queue
Queue
message-driven
false
<int-amqp:channel id="p2pPollableChannel" message-driven="false"/>
发布-订阅通道可能如下所示:
<int-amqp:publish-subscribe-channel id="pubSubChannel"/>
在幕后,前面的示例会导致声明一个名为 fanout exchange,并且此通道发送到该 fanout 交换。
此通道还声明一个名为 exclusive、auto-delete、non-durable 的服务器,并将其绑定到扇出交换,同时在该交换上注册使用者以接收消息。
publish-subscribe-channel 没有 “pollable” 选项。
它必须是消息驱动的。si.fanout.pubSubChannel
Queue
Queue
从版本 4.1 开始,AMQP 支持的消息通道(与 一起)支持将 和
对于 .
请注意,以前是默认的。
现在,默认情况下,它适用于 .channel-transacted
template-channel-transacted
transactional
AbstractMessageListenerContainer
RabbitTemplate
channel-transacted
true
false
AbstractMessageListenerContainer
在版本 4.3 之前,AMQP 支持的通道仅支持带有有效负载和 Headers 的消息。
整个消息被转换(序列化)并发送到 RabbitMQ。
现在,您可以将属性(或使用 Java 配置时)设置为 .
当此标志为 时,将转换消息有效负荷并映射报头,其方式类似于使用通道适配器时。
这种安排允许 AMQP 支持的通道与不可序列化的有效负载一起使用(可能与另一个消息转换器一起使用,例如 )。
有关默认映射 Headers 的更多信息,请参阅 AMQP Message Headers。
您可以通过提供使用 和 属性的自定义映射器来修改映射。
现在,您还可以指定 ,该 用于在没有标头时设置投放模式。
默认情况下, Spring AMQP 使用交付模式。Serializable
extract-payload
setExtractPayload()
true
true
Jackson2JsonMessageConverter
outbound-header-mapper
inbound-header-mapper
default-delivery-mode
amqp_deliveryMode
MessageProperties
PERSISTENT
与其他支持持久性的通道一样,AMQP 支持的通道旨在提供消息持久性以避免消息丢失。 它们不用于将工作分发给其他对等应用程序。 为此,请改用通道适配器。 |
从版本 5.0 开始,pollable 通道现在阻塞指定 (默认为 1 秒) 的 poller 线程。
以前,与其他实现不同,如果没有可用的消息,则线程会立即返回到调度程序,而不管接收超时如何。
阻塞比使用 a 检索消息(无超时)要贵一些,因为必须创建一个使用者来接收每条消息。
要恢复以前的行为,请将 poller's 设置为 0。receiveTimeout PollableChannel basicGet() receiveTimeout |
使用 Java 配置进行配置
以下示例显示如何使用 Java 配置配置通道:
@Bean
public AmqpChannelFactoryBean pollable(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean();
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("foo");
factoryBean.setPubSub(false);
return factoryBean;
}
@Bean
public AmqpChannelFactoryBean messageDriven(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("bar");
factoryBean.setPubSub(false);
return factoryBean;
}
@Bean
public AmqpChannelFactoryBean pubSub(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("baz");
factoryBean.setPubSub(false);
return factoryBean;
}
使用 Java DSL 进行配置
以下示例显示如何使用 Java DSL 配置通道:
@Bean
public IntegrationFlow pollableInFlow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(...)
...
.channel(Amqp.pollableChannel(connectionFactory)
.queueName("foo"))
...
.get();
}
@Bean
public IntegrationFlow messageDrivenInFow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(...)
...
.channel(Amqp.channel(connectionFactory)
.queueName("bar"))
...
.get();
}
@Bean
public IntegrationFlow pubSubInFlow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(...)
...
.channel(Amqp.publishSubscribeChannel(connectionFactory)
.queueName("baz"))
...
.get();
}