此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.3.1Spring中文文档

此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.3.1Spring中文文档

有两种消息通道实现可用。 一种是点对点的,另一种是发布-订阅。 这两个通道都为基础通道提供了广泛的配置属性(如本章前面所示的通道适配器和网关)。 但是,我们在这里显示的示例具有最小的配置。 浏览 XML 架构以查看可用属性。AmqpTemplateSimpleMessageListenerContainerSpring中文文档

点对点通道可能如以下示例所示:Spring中文文档

<int-amqp:channel id="p2pChannel"/>

在幕后,前面的示例导致声明一个 name,并且此通道发送到该通道(从技术上讲,通过使用与此名称匹配的路由密钥发送到 no-name 直接交换)。 该频道还注册了该消费者。 如果希望通道是“可轮询的”而不是消息驱动的,请为标志提供值为 ,如以下示例所示:Queuesi.p2pChannelQueueQueueQueuemessage-drivenfalseSpring中文文档

<int-amqp:channel id="p2pPollableChannel"  message-driven="false"/>

发布-订阅频道可能如下所示:Spring中文文档

<int-amqp:publish-subscribe-channel id="pubSubChannel"/>

在后台,前面的示例导致声明一个名为 fanout 交换,并且此通道发送到该扇出交换。 此通道还声明了服务器命名的独占、自动删除、非持久性,并将其绑定到扇出交换,同时在其上注册使用者以接收消息。 发布-订阅-频道没有“可轮询”选项。 它必须是消息驱动的。si.fanout.pubSubChannelQueueQueueSpring中文文档

从版本 4.1 开始,AMQP 支持的消息通道(与 一起)支持对 和 对于 . 请注意,以前是默认的。 现在,默认情况下,它用于 .channel-transactedtemplate-channel-transactedtransactionalAbstractMessageListenerContainerRabbitTemplatechannel-transactedtruefalseAbstractMessageListenerContainerSpring中文文档

在版本 4.3 之前,AMQP 支持的通道仅支持包含有效负载和标头的消息。 整个消息被转换(序列化)并发送到 RabbitMQ。 现在,您可以将属性(或使用 Java 配置时)设置为 。 当此标志为 时,将转换消息有效负载并映射标头,其方式与使用通道适配器时类似。 这种安排允许 AMQP 支持的通道与不可序列化的有效负载一起使用(可能与另一个消息转换器一起使用,例如 )。 有关默认映射标头的详细信息,请参阅 AMQP 消息标头。 您可以通过提供使用 and 属性的自定义映射器来修改映射。 现在,您还可以指定 ,用于在没有标头时设置传递模式。 默认情况下,Spring AMQP 使用交付模式。Serializableextract-payloadsetExtractPayload()truetrueJackson2JsonMessageConverteroutbound-header-mapperinbound-header-mapperdefault-delivery-modeamqp_deliveryModeMessagePropertiesPERSISTENTSpring中文文档

与其他持久性支持的通道一样,AMQP 支持的通道旨在提供消息持久性以避免消息丢失。 它们不打算将工作分发给其他对等应用程序。 为此,请改用通道适配器。
从版本 5.0 开始,可轮询通道现在阻止指定的轮询器线程(默认值为 1 秒)。 以前,与其他实现不同,如果没有可用的消息,线程会立即返回到调度程序,而不管接收超时如何。 阻塞比使用 a 检索消息(没有超时)要贵一些,因为必须创建一个使用者才能接收每条消息。 若要还原以前的行为,请将轮询器设置为 0。receiveTimeoutPollableChannelbasicGet()receiveTimeout
与其他持久性支持的通道一样,AMQP 支持的通道旨在提供消息持久性以避免消息丢失。 它们不打算将工作分发给其他对等应用程序。 为此,请改用通道适配器。
从版本 5.0 开始,可轮询通道现在阻止指定的轮询器线程(默认值为 1 秒)。 以前,与其他实现不同,如果没有可用的消息,线程会立即返回到调度程序,而不管接收超时如何。 阻塞比使用 a 检索消息(没有超时)要贵一些,因为必须创建一个使用者才能接收每条消息。 若要还原以前的行为,请将轮询器设置为 0。receiveTimeoutPollableChannelbasicGet()receiveTimeout

使用 Java 配置进行配置

以下示例演示如何使用 Java 配置配置通道:Spring中文文档

@Bean
public AmqpChannelFactoryBean pollable(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean();
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("foo");
    factoryBean.setPubSub(false);
    return factoryBean;
}

@Bean
public AmqpChannelFactoryBean messageDriven(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("bar");
    factoryBean.setPubSub(false);
    return factoryBean;
}

@Bean
public AmqpChannelFactoryBean pubSub(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("baz");
    factoryBean.setPubSub(false);
    return factoryBean;
}

使用 Java DSL 进行配置

以下示例演示如何使用 Java DSL 配置通道:Spring中文文档

@Bean
public IntegrationFlow pollableInFlow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(...)
            ...
            .channel(Amqp.pollableChannel(connectionFactory)
                    .queueName("foo"))
            ...
            .get();
}

@Bean
public IntegrationFlow messageDrivenInFow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(...)
            ...
            .channel(Amqp.channel(connectionFactory)
                    .queueName("bar"))
            ...
            .get();
}

@Bean
public IntegrationFlow pubSubInFlow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(...)
            ...
            .channel(Amqp.publishSubscribeChannel(connectionFactory)
                    .queueName("baz"))
            ...
            .get();
}