此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring AMQP 3.2.5spring-doc.cadn.net.cn

RabbitMQ AMQP 1.0 支持

版本 4.0 引入了spring-rabbitmq-client模块,用于 RabbitMQ 上的 AMQP 1.0 协议支持。spring-doc.cadn.net.cn

此工件基于 com.rabbitmq.client:amqp-client 库,因此只能与 RabbitMQ 及其 AMQP 1.0 协议支持一起使用。 它不能用于任何任意 AMQP 1.0 代理。 为此,到目前为止建议使用 JMS 桥和相应的 Spring JMS 集成。spring-doc.cadn.net.cn

必须将此依赖项添加到项目中,才能与 RabbitMQ AMQP 1.0 支持进行交互:spring-doc.cadn.net.cn

maven
<dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbitmq-client</artifactId>
  <version>4.0.0-SNAPSHOT</version>
</dependency>
Gradle
compile 'org.springframework.amqp:spring-rabbitmq-client:4.0.0-SNAPSHOT'

spring-rabbit(对于 AMQP 0.9.1 协议)作为传递依赖项,用于在此新客户端中重用一些通用 API,例如,异常、@RabbitListener支持。 没有必要在目标项目中同时使用这两种功能,但 RabbitMQ 允许 AMQP 0.9.1 和 1.0 共存。spring-doc.cadn.net.cn

有关 RabbitMQ AMQP 1.0 Java 客户端的更多信息,请参阅其文档spring-doc.cadn.net.cn

RabbitMQ AMQP 1.0 环境

com.rabbitmq.client.amqp.Environment是必须添加到项目中以进行连接管理和其他常见设置的第一项内容。 它是一个节点或节点集群的入口点。 该环境允许创建连接。 它可以包含在连接之间共享的与基础设施相关的配置设置,例如线程池、指标和/或观察:spring-doc.cadn.net.cn

@Bean
Environment environment() {
    return new AmqpEnvironmentBuilder()
            .connectionSettings()
            .port(5672)
            .environmentBuilder()
            .build();
}

一样Environmentinstance 可用于连接不同的 RabbitMQ broker,则必须在特定连接上提供连接设置。 见下文。spring-doc.cadn.net.cn

AMQP 连接工厂

org.springframework.amqp.rabbitmq.client.AmqpConnectionFactory引入 abstraction 来管理com.rabbitmq.client.amqp.Connection. 不要将其与org.springframework.amqp.rabbit.connection.ConnectionFactory仅适用于 AMQP 0.9.1 协议。 这SingleAmqpConnectionFactoryimplementation 用于管理一个连接及其设置。 一样Connection可以在许多生产者、消费者和管理层之间共享。 多路复用由 AMQP 客户端库内部的 AMQP 1.0 协议实现的链接抽象处理。 这Connection具有恢复功能,还可以处理拓扑。spring-doc.cadn.net.cn

在大多数情况下,只需将此 bean 添加到项目中即可:spring-doc.cadn.net.cn

@Bean
AmqpConnectionFactory connectionFactory(Environment environment) {
    return new SingleAmqpConnectionFactory(environment);
}

SingleAmqpConnectionFactorysetter 的所有特定于连接的设置。spring-doc.cadn.net.cn

RabbitMQ 拓扑管理

对于从应用程序角度来看的拓扑管理(交换、队列和绑定),使用RabbitAmqpAdmin存在,这是现有AmqpAdmin接口:spring-doc.cadn.net.cn

@Bean
RabbitAmqpAdmin admin(AmqpConnectionFactory connectionFactory) {
    return new RabbitAmqpAdmin(connectionFactory);
}

相同的 bean 定义Exchange,Queue,BindingDeclarables必须使用 Configuring the Broker 中所述的实例来管理拓扑。 这RabbitAdminspring-rabbit也可以这样做,但它发生在 AMQP 0.9.1 连接上,并且由于RabbitAmqpAdmin基于 AMQP 1.0 连接,则从那里顺利处理拓扑恢复以及发布者和使用者恢复。spring-doc.cadn.net.cn

RabbitAmqpAdmin在其start()生命周期回调。 这initialize()以及所有其他 RabbitMQ 实体管理方法,都可以在运行时手动调用。 在内部,RabbitAmqpAdmin使用com.rabbitmq.client.amqp.Connection.management()API 来执行相应的拓扑作。spring-doc.cadn.net.cn

RabbitAmqpTemplate

RabbitAmqpTemplateAsyncAmqpTemplate并使用 AMQP 1.0 协议执行各种发送/接收作。 需要一个AmqpConnectionFactory,并且可以配置一些默认值。 便com.rabbitmq.client:amqp-client库附带一个com.rabbitmq.client.amqp.MessageRabbitAmqpTemplate仍然公开了一个基于众所周知的org.springframework.amqp.core.Message以及所有支持类,如MessagePropertiesMessageConverter抽象化。 转换 to/fromcom.rabbitmq.client.amqp.MessageRabbitAmqpTemplate. 所有方法都返回一个CompletableFuture最终获取作结果。 带有 plain 对象的作需要消息体转换和SimpleMessageConverter默认使用。 有关转换的更多信息,请参阅 消息转换器spring-doc.cadn.net.cn

通常,只需像这样的一个 bean 就足以执行所有可能的模板模式作:spring-doc.cadn.net.cn

@Bean
RabbitAmqpTemplate rabbitTemplate(AmqpConnectionFactory connectionFactory) {
    return new RabbitAmqpTemplate(connectionFactory);
}

它可以配置为某些默认的 exchange 和 routing key 或仅 queue。 这RabbitAmqpTemplate有一个用于接收作的默认队列和另一个用于请求-答复作的默认队列,其中客户端为请求创建临时队列(如果不存在)。spring-doc.cadn.net.cn

以下是一些示例RabbitAmqpTemplate操作:spring-doc.cadn.net.cn

@Bean
DirectExchange e1() {
    return new DirectExchange("e1");
}

@Bean
Queue q1() {
    return QueueBuilder.durable("q1").deadLetterExchange("dlx1").build();
}

@Bean
Binding b1() {
    return BindingBuilder.bind(q1()).to(e1()).with("k1");
}

...

@Test
void defaultExchangeAndRoutingKey() {
    this.rabbitAmqpTemplate.setExchange("e1");
    this.rabbitAmqpTemplate.setRoutingKey("k1");
	this.rabbitAmqpTemplate.setReceiveQueue("q1");

    assertThat(this.rabbitAmqpTemplate.convertAndSend("test1"))
            .succeedsWithin(Duration.ofSeconds(10));

    assertThat(this.rabbitAmqpTemplate.receiveAndConvert())
            .succeedsWithin(Duration.ofSeconds(10))
            .isEqualTo("test1");
}

在这里,我们声明了一个e1交换q1queue,并使用k1路由密钥。 然后我们使用RabbitAmqpTemplate要使用相应的路由密钥将消息发布到上述 Exchange,并使用q1作为接收作的默认队列。 这些方法存在重载的变体,用于发送到特定的 exchange 或 queue(用于发送和接收)。 这receiveAndConvert()作中带有ParameterizedTypeReference<T>需要SmartMessageConverter注入到RabbitAmqpTemplate.spring-doc.cadn.net.cn

下一个示例演示了 RPC 实现RabbitAmqpTemplate(假设 RabbitMQ 对象与上一个示例中相同):spring-doc.cadn.net.cn

@Test
void verifyRpc() {
    String testRequest = "rpc-request";
    String testReply = "rpc-reply";

    CompletableFuture<Object> rpcClientResult = this.template.convertSendAndReceive("e1", "k1", testRequest);

    AtomicReference<String> receivedRequest = new AtomicReference<>();
    CompletableFuture<Boolean> rpcServerResult =
            this.rabbitAmqpTemplate.<String, String>receiveAndReply("q1",
                     payload -> {
                         receivedRequest.set(payload);
                         return testReply;
                     });

    assertThat(rpcServerResult).succeedsWithin(Duration.ofSeconds(10)).isEqualTo(true);
    assertThat(rpcClientResult).succeedsWithin(Duration.ofSeconds(10)).isEqualTo(testReply);
    assertThat(receivedRequest.get()).isEqualTo(testRequest);
}

相关性和replyToqueue 在内部进行管理。 服务器端可以通过@RabbitListenerPOJO 方法。spring-doc.cadn.net.cn

RabbitMQ AMQP 1.0 使用者

与消费者端的许多其他消息传递实现一样,spring-rabbitmq-clientmodules 附带RabbitAmqpListenerContainer这本质上是 well-know 的实现MessageListenerContainer. 它的作用与DirectMessageListenerContainer,但对于 RabbitMQ AMQP 1.0 支持。 需要一个AmqpConnectionFactory以及至少一个队列可供使用。 此外,MessageListener(或 AMQP 1.0 特定RabbitAmqpMessageListener) 必须提供。 可以配置autoSettle = false,其含义为AcknowledgeMode.MANUAL. 在这种情况下,Message提供给MessageListener在其MessagePropertiesAmqpAcknowledgmentcallback 以考虑 Target 逻辑。spring-doc.cadn.net.cn

RabbitAmqpMessageListenercom.rabbitmq.client:amqp-client抽象:spring-doc.cadn.net.cn

/**
 * Process an AMQP message.
 * @param message the message to process.
 * @param context the consumer context to settle message.
 *                Null if container is configured for {@code autoSettle}.
 */
void onAmqpMessage(Message message, Consumer.Context context);

其中第一个参数是本地接收的com.rabbitmq.client.amqp.Messagecontext是消息结算的原生回调,类似上面说的AmqpAcknowledgment抽象化。spring-doc.cadn.net.cn

RabbitAmqpMessageListener可以在以下情况下批量处理和结算消息batchSize选项。 为此,MessageListener.onMessageBatch()合同必须执行。 这batchReceiveDuration选项用于为非完整批次安排强制释放,以避免内存和使用者积分耗尽。spring-doc.cadn.net.cn

通常,RabbitAmqpMessageListenerclass 不直接在目标项目中使用,并且通过@RabbitListener被选中用于声明性使用者配置。 这RabbitAmqpListenerContainerFactory必须在RabbitListenerAnnotationBeanPostProcessor.DEFAULT_RABBIT_LISTENER_CONTAINER_FACTORY_BEAN_NAME@RabbitListener注解进程将注册RabbitAmqpMessageListener实例到RabbitListenerEndpointRegistry. 目标 POJO 方法调用由特定的RabbitAmqpMessageListenerAdapter实现,它扩展了MessagingMessageListenerAdapter并重用了它的许多功能,包括请求-回复场景(异步或非异步)。 因此,Annotation-driven Listener Endpoints 中描述的所有概念都适用于此RabbitAmqpMessageListener也。spring-doc.cadn.net.cn

除了传统的消息传递payloadheaders@RabbitListenerPOJO 方法合约可以包含以下参数:spring-doc.cadn.net.cn

  • com.rabbitmq.client.amqp.Message- 没有任何转换的本机 AMQP 1.0 消息;spring-doc.cadn.net.cn

  • org.springframework.amqp.core.Message- Spring AMQP 消息抽象作为本机 AMQP 1.0 消息的转换结果;spring-doc.cadn.net.cn

  • org.springframework.messaging.Message- Spring Messaging 抽象作为 Spring AMQP 消息的转换结果;spring-doc.cadn.net.cn

  • Consumer.Context- RabbitMQ AMQP 客户端消费者结算 API;spring-doc.cadn.net.cn

  • org.springframework.amqp.core.AmqpAcknowledgment- Spring AMQP 确认抽象:委托给Consumer.Context.spring-doc.cadn.net.cn

以下示例演示了一个简单的@RabbitListener对于 RabbitMQ AMQP 1.0 与手动结算的交互:spring-doc.cadn.net.cn

@Bean(RabbitListenerAnnotationBeanPostProcessor.DEFAULT_RABBIT_LISTENER_CONTAINER_FACTORY_BEAN_NAME)
RabbitAmqpListenerContainerFactory rabbitAmqpListenerContainerFactory(AmqpConnectionFactory connectionFactory) {
    return new RabbitAmqpListenerContainerFactory(connectionFactory);
}

final List<String> received = Collections.synchronizedList(new ArrayList<>());

CountDownLatch consumeIsDone = new CountDownLatch(11);

@RabbitListener(queues = {"q1", "q2"},
        ackMode = "#{T(org.springframework.amqp.core.AcknowledgeMode).MANUAL}",
        concurrency = "2",
        id = "testAmqpListener")
void processQ1AndQ2Data(String data, AmqpAcknowledgment acknowledgment, Consumer.Context context) {
    try {
        if ("discard".equals(data)) {
            if (!this.received.contains(data)) {
                context.discard();
            }
            else {
                throw new MessageConversionException("Test message is rejected");
            }
        }
        else if ("requeue".equals(data) && !this.received.contains(data)) {
            acknowledgment.acknowledge(AmqpAcknowledgment.Status.REQUEUE);
        }
        else {
            acknowledgment.acknowledge();
        }
        this.received.add(data);
    }
    finally {
        this.consumeIsDone.countDown();
    }
}