对于最新的稳定版本,请使用 Spring Integration 6.4.3spring-doc.cadn.net.cn

AMQP 支持的消息通道

有两种消息通道实现可用。 一个是点对点的,另一个是发布-订阅。 这两个通道都为底层AmqpTemplateSimpleMessageListenerContainer(如本章前面所示的通道适配器和网关)。 但是,我们在此处显示的示例具有最少的配置。 浏览 XML 架构以查看可用属性。spring-doc.cadn.net.cn

点对点通道可能类似于以下示例:spring-doc.cadn.net.cn

<int-amqp:channel id="p2pChannel"/>

在后台,前面的示例会导致Queuesi.p2pChannel要声明的 URL,并且此通道会向该Queue(从技术上讲,通过发送到 no-name 直接交换,并使用与此 name 匹配的 nameQueue). 此通道还会在该Queue. 如果您希望通道是 “pollable” 而不是消息驱动的,请提供message-driven值为false,如下例所示:spring-doc.cadn.net.cn

<int-amqp:channel id="p2pPollableChannel"  message-driven="false"/>

发布-订阅通道可能如下所示:spring-doc.cadn.net.cn

<int-amqp:publish-subscribe-channel id="pubSubChannel"/>

在后台,前面的示例会导致一个名为si.fanout.pubSubChannel要声明,并且此通道将发送到该 fanout 交换。 此通道还声明一个名为 exclusive、auto-delete、non-durable 的服务器Queue并将其绑定到 fanout 交换,同时在该 Consumer上注册 ConsumerQueue以接收消息。 publish-subscribe-channel 没有 “pollable” 选项。 它必须是消息驱动的。spring-doc.cadn.net.cn

从版本 4.1 开始,AMQP 支持的消息通道(与channel-transacted) 支持template-channel-transacted分开transactionalConfiguration 的AbstractMessageListenerContainer和 对于RabbitTemplate. 请注意,在之前的channel-transactedtrue默认情况下。 现在,默认情况下,它是false对于AbstractMessageListenerContainer.spring-doc.cadn.net.cn

在版本 4.3 之前,AMQP 支持的通道仅支持Serializablepayloads 和 headers 的 Headers 中。 整个消息被转换(序列化)并发送到 RabbitMQ。 现在,您可以设置extract-payload属性(或setExtractPayload()when using Java configuration) 设置为true. 当此标志为true,则消息有效负荷将被转换,并且 Headers 将被映射,其方式类似于使用通道适配器时。 这种安排允许 AMQP 支持的通道与不可序列化的有效负载(可能与其他消息转换器一起使用,例如Jackson2JsonMessageConverter). 有关默认映射 Headers 的更多信息,请参阅 AMQP Message Headers。 您可以通过提供使用outbound-header-mapperinbound-header-mapper属性。 现在,您还可以指定default-delivery-mode,用于设置没有amqp_deliveryMode页眉。 默认情况下, Spring AMQPMessageProperties使用PERSISTENT交付模式。spring-doc.cadn.net.cn

与其他支持持久性的通道一样,AMQP 支持的通道旨在提供消息持久性以避免消息丢失。 它们不用于将工作分发给其他对等应用程序。 为此,请改用通道适配器。
从版本 5.0 开始,pollable 通道现在会阻塞指定receiveTimeout(默认值为 1 秒)。 以前,与其他PollableChannel实现中,如果没有可用的消息,则线程会立即返回到调度程序,而不管接收超时如何。 阻止比使用basicGet()检索消息(无超时),因为必须创建一个使用者来接收每条消息。 要恢复之前的行为,请将 Poller 的receiveTimeout设置为 0。

使用 Java 配置进行配置

以下示例显示如何使用 Java 配置配置通道:spring-doc.cadn.net.cn

@Bean
public AmqpChannelFactoryBean pollable(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean();
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("foo");
    factoryBean.setPubSub(false);
    return factoryBean;
}

@Bean
public AmqpChannelFactoryBean messageDriven(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("bar");
    factoryBean.setPubSub(false);
    return factoryBean;
}

@Bean
public AmqpChannelFactoryBean pubSub(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("baz");
    factoryBean.setPubSub(false);
    return factoryBean;
}

使用 Java DSL 进行配置

以下示例显示如何使用 Java DSL 配置通道:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow pollableInFlow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(...)
            ...
            .channel(Amqp.pollableChannel(connectionFactory)
                    .queueName("foo"))
            ...
            .get();
}

@Bean
public IntegrationFlow messageDrivenInFow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(...)
            ...
            .channel(Amqp.channel(connectionFactory)
                    .queueName("bar"))
            ...
            .get();
}

@Bean
public IntegrationFlow pubSubInFlow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(...)
            ...
            .channel(Amqp.publishSubscribeChannel(connectionFactory)
                    .queueName("baz"))
            ...
            .get();
}