对于最新的稳定版本,请使用 Spring AMQP 3.2.0spring-doc.cn

请求/回复消息

还提供了多种方法,这些方法接受前面描述的单向发送操作(、 和 )的相同参数选项。 这些方法对于请求-答复方案非常有用,因为它们在发送之前处理必要属性的配置,并且可以在内部为此目的创建的独占队列上侦听答复消息。AmqpTemplatesendAndReceiveexchangeroutingKeyMessagereply-tospring-doc.cn

类似的请求-回复方法也可用于同时应用于请求和回复的情况。 这些方法被命名为 . 有关更多详细信息,请参见 AmqpTemplate 的 JavadocMessageConverterconvertSendAndReceivespring-doc.cn

从版本 1.5.0 开始,每个方法变体都有一个重载版本,该版本采用 . 与正确配置的连接工厂一起,这允许接收操作发送端的发布者确认。 有关更多信息,请参见Correlated Publisher Confirms and ReturnsRabbitOperations的 JavadocsendAndReceiveCorrelationDataspring-doc.cn

从版本 2.0 开始,这些方法 () 有一些变体,它们需要一个额外的参数来转换复杂的返回类型。 模板必须配置有 . 有关更多信息,请参阅使用 RabbitTemplate消息转换convertSendAndReceiveAsTypeParameterizedTypeReferenceSmartMessageConverterspring-doc.cn

从版本 2.1 开始,您可以配置 with 选项来控制回复使用者的标志。 这是默认的。RabbitTemplatenoLocalReplyConsumernoLocalfalsespring-doc.cn

回复超时

默认情况下,发送和接收方法会在 5 秒后超时并返回 null。 您可以通过设置该属性来修改此行为。 从版本 1.5 开始,如果将属性设置为(或将特定消息的计算结果设置为),则如果无法将消息传送到队列,则会引发 an。 此异常具有 、 、 和 属性,以及用于发送的 和。replyTimeoutmandatorytruemandatory-expressiontrueAmqpMessageReturnedExceptionreturnedMessagereplyCodereplyTextexchangeroutingKeyspring-doc.cn

此功能使用发布者返回。 您可以通过在 上设置为 来启用它(请参阅 Publisher Confirms and Returns)。 此外,您不得在 .publisherReturnstrueCachingConnectionFactoryReturnCallbackRabbitTemplate

从版本 2.1.2 开始,添加了一个方法,让 subclasses 被通知超时,以便它们可以清理任何保留的状态。replyTimedOutspring-doc.cn

从版本 2.0.11 和 2.1.3 开始,当您使用 default 时,可以通过设置模板的属性来添加错误处理程序。 对于任何失败的投放,如延迟回复和在没有关联标头的情况下收到的消息,都会调用此错误处理程序。 传入的异常是 ,它具有属性。DirectReplyToMessageListenerContainerreplyErrorHandlerListenerExecutionFailedExceptionfailedMessagespring-doc.cn

RabbitMQ 直接回复

从版本 3.4.0 开始,RabbitMQ 服务器支持直接回复。 这消除了固定回复队列的主要原因(以避免需要为每个请求创建临时队列)。 从 Spring AMQP 版本 1.4.1 开始,默认情况下使用直接回复(如果服务器支持)而不是创建临时回复队列。 当 no 未提供(或设置为名称 )时,会自动检测是否支持直接回复,并使用它或回退到使用临时回复队列。 使用直接回复时,a 不是必需的,也不应配置。replyQueueamq.rabbitmq.reply-toRabbitTemplatereply-listener

命名队列(而不是 )仍支持回复侦听器,允许控制回复并发等。amq.rabbitmq.reply-tospring-doc.cn

从版本 1.6 开始,如果您希望为每个 reply 中,将属性设置为 。 如果将 .useTemporaryReplyQueuestruereplyAddressspring-doc.cn

您可以通过 subclassing 和 override 来更改指示是否使用 direct reply-to 的标准,以检查不同的标准。 该方法仅在发送第一个请求时调用一次。RabbitTemplateuseDirectReplyTo()spring-doc.cn

在 2.0 版本之前,为每个请求创建一个新的使用者,并在收到回复(或超时)时取消该使用者。 现在,模板改用 a,让使用者被重用。 该模板仍然负责关联回复,因此不存在延迟回复发送给其他发件人的危险。 如果要恢复到以前的行为,请将 ( when using XML configuration) 属性设置为 false。RabbitTemplateDirectReplyToMessageListenerContaineruseDirectReplyToContainerdirect-reply-to-containerspring-doc.cn

没有这样的选项。 当使用 direct reply-to 时,它总是使用 a for replies 。AsyncRabbitTemplateDirectReplyToContainerspring-doc.cn

从版本 2.3.7 开始,模板具有新属性 。 如果为 ,则服务器不必将相关 ID 从请求消息标头复制到回复消息。 相反,用于发送请求的通道用于将回复与请求相关联。useChannelForCorrelationtruespring-doc.cn

Message Correlation With A Reply Queue

使用固定回复队列(而不是 )时,必须提供关联数据,以便将回复与请求相关联。 请参阅 RabbitMQ 远程过程调用 (RPC)。 默认情况下,standard 属性用于保存关联数据。 但是,如果你希望使用自定义属性来保存相关数据,则可以在<rabbit-template/>上设置该属性。 显式设置 attribute 与省略 attribute 相同。 客户端和服务器必须对关联数据使用相同的标头。amq.rabbitmq.reply-tocorrelationIdcorrelation-keycorrelationIdspring-doc.cn

Spring AMQP 版本 1.1 使用了一个名为此数据的自定义属性。 如果您希望在当前版本中恢复此行为(可能是为了保持与使用 1.1 的其他应用程序的兼容性),则必须将该属性设置为 .spring_reply_correlationspring_reply_correlation

默认情况下,模板会生成自己的相关 ID(忽略任何用户提供的值)。 如果您希望使用自己的关联 ID,请将实例的属性设置为 。RabbitTemplateuserCorrelationIdtruespring-doc.cn

相关 ID 必须是唯一的,以避免为请求返回错误回复的可能性。

回复侦听器容器

当使用 3.4.0 之前的 RabbitMQ 版本时,每个回复都会使用一个新的临时队列。 但是,可以在模板上配置单个回复队列,这样可以更高效,并且还允许您在该队列上设置参数。 但是,在这种情况下,您还必须提供 <reply-listener/> 子元素。 此元素为回复队列提供侦听器容器,模板是侦听器。 元素上允许的所有 <listener-container/> 消息侦听器容器配置属性都允许,但 和 除外,它们是从模板的配置继承的。connection-factorymessage-converterspring-doc.cn

如果您运行应用程序的多个实例或使用多个实例,则必须为每个实例使用唯一的回复队列。 RabbitMQ 无法从队列中选择消息,因此,如果它们都使用相同的队列,则每个实例都将争夺回复,而不必收到自己的回复。RabbitTemplate

下面的示例定义了一个带有连接工厂的 rabbit 模板:spring-doc.cn

<rabbit:template id="amqpTemplate"
        connection-factory="connectionFactory"
        reply-queue="replies"
        reply-address="replyEx/routeReply">
    <rabbit:reply-listener/>
</rabbit:template>

虽然容器和模板共享一个连接工厂,但它们不共享一个通道。 因此,请求和回复不会在同一事务中执行(如果是事务性的)。spring-doc.cn

在版本 1.5.0 之前,该属性不可用。 回复始终使用默认 exchange 和 name 作为路由键进行路由。 这仍然是默认值,但您现在可以指定 new 属性。 该 可以包含具有格式的地址,并且回复将路由到指定的交换,并路由到与路由密钥绑定的队列。 的 优先于 。 当 only 正在使用时,必须将其配置为单独的组件。 and(或 上的属性)必须在逻辑上引用同一队列。reply-addressreply-queuereply-addressreply-address<exchange>/<routingKey>reply-addressreply-queuereply-address<reply-listener><listener-container>reply-addressreply-queuequeues<listener-container>

通过此配置,a 用于接收回复,其中 . 使用 namespace 元素定义模板时,如前面的示例所示,解析器将模板中的容器和连线定义为侦听器。SimpleListenerContainerRabbitTemplateMessageListener<rabbit:template/>spring-doc.cn

当模板不使用固定的(或使用直接回复 — 请参阅 RabbitMQ 直接回复)时,不需要侦听器容器。 Direct 是使用 RabbitMQ 3.4.0 或更高版本时的首选机制。replyQueuereply-to

如果您将 YOUR 定义为 OR 使用类将其定义为 OR 当您以编程方式创建模板时,您需要自己定义和连接回复侦听器容器。 如果您未能执行此操作,则模板永远不会收到回复,最终会超时并返回 null 作为对方法调用的回复。RabbitTemplate<bean/>@Configuration@BeansendAndReceivespring-doc.cn

从版本 1.5 开始,检测是否已 配置为 A 以接收回复。 否则,尝试使用回复地址发送和接收消息 fail 替换为 an (因为从未收到回复)。RabbitTemplateMessageListenerIllegalStateExceptionspring-doc.cn

此外,如果使用 simple (队列名称),则回复侦听器容器会验证它是否正在侦听 添加到具有相同名称的队列中。 如果回复地址是交换和路由密钥,并且写入了调试日志消息,则无法执行此检查。replyAddressspring-doc.cn

自己连接回复侦听器和模板时,请务必确保模板和容器的(或)属性引用同一队列。 该模板将回复地址插入到出站消息属性中。replyAddressqueuesqueueNamesreplyTo

下面的清单显示了如何手动连接 bean 的示例:spring-doc.cn

<bean id="amqpTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
    <constructor-arg ref="connectionFactory" />
    <property name="exchange" value="foo.exchange" />
    <property name="routingKey" value="foo" />
    <property name="replyQueue" ref="replyQ" />
    <property name="replyTimeout" value="600000" />
    <property name="useDirectReplyToContainer" value="false" />
</bean>

<bean class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
    <constructor-arg ref="connectionFactory" />
    <property name="queues" ref="replyQ" />
    <property name="messageListener" ref="amqpTemplate" />
</bean>

<rabbit:queue id="replyQ" name="my.reply.queue" />
    @Bean
    public RabbitTemplate amqpTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        rabbitTemplate.setMessageConverter(msgConv());
        rabbitTemplate.setReplyAddress(replyQueue().getName());
        rabbitTemplate.setReplyTimeout(60000);
        rabbitTemplate.setUseDirectReplyToContainer(false);
        return rabbitTemplate;
    }

    @Bean
    public SimpleMessageListenerContainer replyListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory());
        container.setQueues(replyQueue());
        container.setMessageListener(amqpTemplate());
        return container;
    }

    @Bean
    public Queue replyQueue() {
        return new Queue("my.reply.queue");
    }

此测试用例显示了一个具有固定回复队列的 wired 的完整示例,以及处理请求并返回回复的 “remote” 侦听器容器。RabbitTemplatespring-doc.cn

当回复超时 () 时,方法返回 null。replyTimeoutsendAndReceive()

在版本 1.3.6 之前,仅记录超时消息的延迟回复。 现在,如果收到延迟的回复,则会被拒绝(模板会引发 )。 如果回复队列配置为将被拒绝的消息发送到死信交换,则可以检索回复以供以后分析。 为此,请将队列绑定到配置的死信交换,其路由键等于回复队列的名称。AmqpRejectAndDontRequeueExceptionspring-doc.cn

有关配置死信的更多信息,请参阅 RabbitMQ 死信文档。 您还可以查看测试用例作为示例。FixedReplyQueueDeadLetterTestsspring-doc.cn

异步 Rabbit 模板

版本 1.6 引入了 . 这与 AmqpTemplate 上的方法类似(和)方法。 但是,它们不是阻塞,而是返回一个 .AsyncRabbitTemplatesendAndReceiveconvertSendAndReceiveCompletableFuturespring-doc.cn

这些方法返回一个 . 这些方法返回一个 .sendAndReceiveRabbitMessageFutureconvertSendAndReceiveRabbitConverterFuturespring-doc.cn

您可以稍后通过调用 future 来同步检索结果,也可以注册一个与结果异步调用的回调。 下面的清单显示了这两种方法:get()spring-doc.cn

@Autowired
private AsyncRabbitTemplate template;

...

public void doSomeWorkAndGetResultLater() {

    ...

    CompletableFuture<String> future = this.template.convertSendAndReceive("foo");

    // do some more work

    String reply = null;
    try {
        reply = future.get(10, TimeUnit.SECONDS);
    }
    catch (ExecutionException e) {
        ...
    }

    ...

}

public void doSomeWorkAndGetResultAsync() {

    ...

    RabbitConverterFuture<String> future = this.template.convertSendAndReceive("foo");
    future.whenComplete((result, ex) -> {
        if (ex == null) {
            // success
        }
        else {
            // failure
        }
    });

    ...

}

如果设置了该消息,并且无法传递消息,则 future 会引发一个原因为 ,该消息封装了返回的消息和有关返回的信息。mandatoryExecutionExceptionAmqpMessageReturnedExceptionspring-doc.cn

如果设置,则 future 具有一个名为 的属性,该属性本身是 a,表示发布成功。 如果确认 future 为 ,则具有一个名为 的进一步属性,其中包含失败的原因(如果可用)。enableConfirmsconfirmCompletableFuture<Boolean>truefalseRabbitFuturenackCausespring-doc.cn

如果在回复后收到发布者确认,则会丢弃发布者确认,因为回复意味着发布成功。

您可以在模板上设置该属性以超时回复(默认为 - 30 秒)。 如果发生超时,则 future 将以 .receiveTimeout30000AmqpReplyTimeoutExceptionspring-doc.cn

该模板实现 . 在有待处理回复时停止模板会导致待处理实例被取消。SmartLifecycleFuturespring-doc.cn

从版本 2.0 开始,异步模板现在支持直接回复,而不是配置的回复队列。 要启用此功能,请使用以下构造函数之一:spring-doc.cn

public AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange, String routingKey)

public AsyncRabbitTemplate(RabbitTemplate template)

请参阅 RabbitMQ Direct reply-to 以将直接回复与同步 一起使用。RabbitTemplatespring-doc.cn

版本 2.0 引入了这些方法 () 的变体,这些方法采用额外的参数来转换复杂的返回类型。 您必须使用 . 有关更多信息,请参阅使用 RabbitTemplate消息转换convertSendAndReceiveAsTypeParameterizedTypeReferenceRabbitTemplateSmartMessageConverterspring-doc.cn

从版本 3.0 开始,这些方法现在返回 s 而不是 s。AsyncRabbitTemplateCompletableFutureListenableFuture

使用 AMQP 的 Spring 远程处理

Spring 远程处理不再受支持,因为该功能已从 Spring Framework 中删除。spring-doc.cn

使用 (client side) 和 代替。sendAndReceiveRabbitTemplate@RabbitListenerspring-doc.cn