与 Spring 框架和相关项目提供的许多其他高级抽象一样,Spring AMQP 提供了一个起核心作用的“模板”。 定义主要操作的接口称为 。 这些操作涵盖发送和接收消息的一般行为。 换言之,它们并非任何实现所独有,因此名称中的“AMQP”就是其中之一。 另一方面,该接口的实现与 AMQP 协议的实现相关联。 与 JMS 不同,JMS 本身就是一个接口级 API,而 AMQP 是一个线级协议。 该协议的实现提供自己的客户端库,因此模板接口的每个实现都依赖于特定的客户端库。 目前,只有一个实现:. 在下面的示例中,我们经常使用 . 但是,当您查看配置示例或实例化模板或调用 setter 的任何代码摘录时,您可以看到实现类型(例如,)。AmqpTemplateRabbitTemplateAmqpTemplateRabbitTemplateSpring中文文档

如前所述,该接口定义了发送和接收消息的所有基本操作。 我们将分别在发送消息接收消息中探讨消息发送和接收。AmqpTemplateSpring中文文档

添加重试功能

从版本 1.3 开始,您现在可以配置 to use a 来帮助处理代理连接问题。 有关完整信息,请参阅 spring-retry 项目。 以下只是一个使用指数回退策略和默认策略的示例,该策略在将异常抛给调用方之前进行三次尝试。RabbitTemplateRetryTemplateSimpleRetryPolicySpring中文文档

下面的示例使用 XML 命名空间:Spring中文文档

<rabbit:template id="template" connection-factory="connectionFactory" retry-template="retryTemplate"/>

<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
    <property name="backOffPolicy">
        <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
            <property name="initialInterval" value="500" />
            <property name="multiplier" value="10.0" />
            <property name="maxInterval" value="10000" />
        </bean>
    </property>
</bean>

以下示例使用 Java 中的注释:@ConfigurationSpring中文文档

@Bean
public RabbitTemplate rabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionFactory());
    RetryTemplate retryTemplate = new RetryTemplate();
    ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
    backOffPolicy.setInitialInterval(500);
    backOffPolicy.setMultiplier(10.0);
    backOffPolicy.setMaxInterval(10000);
    retryTemplate.setBackOffPolicy(backOffPolicy);
    template.setRetryTemplate(retryTemplate);
    return template;
}

从版本 1.4 开始,除了属性之外,该选项还支持 . 它用作 . 的第二个参数。retryTemplaterecoveryCallbackRabbitTemplateRetryTemplate.execute(RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback)Spring中文文档

这在一定程度上是有限制的,因为重试上下文只包含该字段。 对于更复杂的用例,您应该使用外部,以便可以通过上下文的属性向 以下示例演示如何执行此操作:RecoveryCallbacklastThrowableRetryTemplateRecoveryCallback
retryTemplate.execute(
    new RetryCallback<Object, Exception>() {

        @Override
        public Object doWithRetry(RetryContext context) throws Exception {
            context.setAttribute("message", message);
            return rabbitTemplate.convertAndSend(exchange, routingKey, message);
        }

    }, new RecoveryCallback<Object>() {

        @Override
        public Object recover(RetryContext context) throws Exception {
            Object message = context.getAttribute("message");
            Throwable t = context.getLastThrowable();
            // Do something with message
            return null;
        }
    });
}

在这种情况下,您不会将 a 注入 .RetryTemplateRabbitTemplateSpring中文文档

这在一定程度上是有限制的,因为重试上下文只包含该字段。 对于更复杂的用例,您应该使用外部,以便可以通过上下文的属性向 以下示例演示如何执行此操作:RecoveryCallbacklastThrowableRetryTemplateRecoveryCallback

发布是异步的 — 如何检测成功和失败

发布消息是一种异步机制,默认情况下,无法路由的消息由 RabbitMQ 丢弃。 若要成功发布,可以收到异步确认,如相关发布者确认和返回中所述。 考虑两种故障场景:Spring中文文档

第一种情况由发布商退货涵盖,如相关发布商确认和退货中所述。Spring中文文档

对于第二种情况,消息将被丢弃,并且不会生成任何返回。 基础通道关闭,但出现异常。 默认情况下,会记录此异常,但您可以向 注册 以获取此类事件的通知。 以下示例演示如何添加:ChannelListenerCachingConnectionFactoryConnectionListenerSpring中文文档

this.connectionFactory.addConnectionListener(new ConnectionListener() {

    @Override
    public void onCreate(Connection connection) {
    }

    @Override
    public void onShutDown(ShutdownSignalException signal) {
        ...
    }

});

您可以检查信号的属性以确定发生的问题。reasonSpring中文文档

若要在发送线程上检测异常,可以在 上检测到异常。 但是,事务会严重影响性能,因此在仅为此用例启用事务之前,请仔细考虑这一点。setChannelTransacted(true)RabbitTemplatetxCommit()Spring中文文档

相关发布者确认并返回

支持发布者确认和返回的实现。RabbitTemplateAmqpTemplateSpring中文文档

对于返回的消息,模板的属性必须设置为或必须为特定消息计算。 此功能需要将其属性设置为 (请参阅发布者确认和返回) 。 返回通过调用注册客户端发送给客户端。 回调必须实现以下方法:mandatorytruemandatory-expressiontrueCachingConnectionFactorypublisherReturnstrueRabbitTemplate.ReturnsCallbacksetReturnsCallback(ReturnsCallback callback)Spring中文文档

void returnedMessage(ReturnedMessage returned);

具有以下属性:ReturnedMessageSpring中文文档

每个 . 另请参阅回复超时ReturnsCallbackRabbitTemplateSpring中文文档

对于发布者确认(也称为发布者确认),模板需要将其属性设置为 的 。 确认由客户端通过调用 . 回调必须实现以下方法:CachingConnectionFactorypublisherConfirmConfirmType.CORRELATEDRabbitTemplate.ConfirmCallbacksetConfirmCallback(ConfirmCallback callback)Spring中文文档

void confirm(CorrelationData correlationData, boolean ack, String cause);

是客户端在发送原始消息时提供的对象。 an 为 true,a 为 false 。 例如,原因可能包含 的原因,如果它在生成时可用。 例如,向不存在的交易所发送消息时。 在这种情况下,代理关闭通道。 关闭的原因包含在 中。 在版本 1.4 中添加了。CorrelationDataackacknacknacknacknackcausecauseSpring中文文档

只有一个受 .ConfirmCallbackRabbitTemplateSpring中文文档

当兔子模板发送操作完成时,通道将关闭。 当连接工厂缓存已满时(当缓存中有空间时,通道未物理关闭,返回和确认正常进行),这将阻止接收确认或返回。 当缓存已满时,框架会将关闭延迟最多 5 秒,以便有时间接收确认和返回。 使用确认时,当收到最后一次确认时,通道将关闭。 当仅使用返回时,通道将保持打开状态整整 5 秒钟。 我们通常建议将连接工厂的值设置为足够大的值,以便将发布消息的通道返回到缓存中,而不是关闭。 您可以使用 RabbitMQ 管理插件监控通道使用情况。 如果您看到通道快速打开和关闭,则应考虑增加缓存大小以减少服务器上的开销。channelCacheSize
在版本 2.1 之前,为发布者确认启用的频道在收到确认之前会返回到缓存中。 其他一些进程可以签出通道并执行一些导致通道关闭的操作,例如将消息发布到不存在的交换。 这可能会导致确认丢失。 版本 2.1 及更高版本在确认未完成时不再将通道返回到缓存。 每次操作后,在通道上执行逻辑操作。 通常,这意味着一个频道上一次只有一个确认未完成。RabbitTemplateclose()
从版本 2.2 开始,在连接工厂的某个线程上调用回调。 这是为了避免从回调中执行 Rabbit 操作时出现潜在的死锁。 在以前的版本中,回调是直接在连接 I/O 线程上调用的;如果执行某些 RPC 操作(例如打开新通道),则会出现死锁,因为 I/O 线程会阻塞等待结果,但结果需要由 I/O 线程本身处理。 对于这些版本,有必要将工作(例如发送消息)移交给回调中的另一个线程。 这不再是必需的,因为框架现在将回调调用移交给执行器。executoramqp-client
只要返回回调在 60 秒或更短时间内执行,在确认之前接收返回消息的保证仍然保持不变。 确认计划在返回回调退出后或 60 秒后(以先到者为准)交付。

该对象具有可用于获取结果的对象,而不是在模板上使用 。 以下示例演示如何配置实例:CorrelationDataCompletableFutureConfirmCallbackCorrelationDataSpring中文文档

CorrelationData cd1 = new CorrelationData();
this.templateWithConfirmsEnabled.convertAndSend("exchange", queue.getName(), "foo", cd1);
assertTrue(cd1.getFuture().get(10, TimeUnit.SECONDS).isAck());
ReturnedMessage = cd1.getReturn();
...

由于它是一个 ,因此可以在准备就绪时获得结果,也可以用于异步回调。 该对象是具有 2 个属性的简单 Bean:和 (例如实例)。 不会为代理生成的实例填充原因。 它为框架生成的实例填充(例如,在实例未完成时关闭连接)。CompletableFuture<Confirm>get()whenComplete()ConfirmackreasonnacknacknackackSpring中文文档

此外,当同时启用确认和返回时,如果无法将返回的消息路由到任何队列,则该属性将填充该属性。 保证在使用 . 返回 A with 属性:CorrelationDatareturnackCorrelationData.getReturn()ReturnMessageSpring中文文档

另请参阅作用域内操作,了解等待发布者确认的更简单机制。Spring中文文档

当兔子模板发送操作完成时,通道将关闭。 当连接工厂缓存已满时(当缓存中有空间时,通道未物理关闭,返回和确认正常进行),这将阻止接收确认或返回。 当缓存已满时,框架会将关闭延迟最多 5 秒,以便有时间接收确认和返回。 使用确认时,当收到最后一次确认时,通道将关闭。 当仅使用返回时,通道将保持打开状态整整 5 秒钟。 我们通常建议将连接工厂的值设置为足够大的值,以便将发布消息的通道返回到缓存中,而不是关闭。 您可以使用 RabbitMQ 管理插件监控通道使用情况。 如果您看到通道快速打开和关闭,则应考虑增加缓存大小以减少服务器上的开销。channelCacheSize
在版本 2.1 之前,为发布者确认启用的频道在收到确认之前会返回到缓存中。 其他一些进程可以签出通道并执行一些导致通道关闭的操作,例如将消息发布到不存在的交换。 这可能会导致确认丢失。 版本 2.1 及更高版本在确认未完成时不再将通道返回到缓存。 每次操作后,在通道上执行逻辑操作。 通常,这意味着一个频道上一次只有一个确认未完成。RabbitTemplateclose()
从版本 2.2 开始,在连接工厂的某个线程上调用回调。 这是为了避免从回调中执行 Rabbit 操作时出现潜在的死锁。 在以前的版本中,回调是直接在连接 I/O 线程上调用的;如果执行某些 RPC 操作(例如打开新通道),则会出现死锁,因为 I/O 线程会阻塞等待结果,但结果需要由 I/O 线程本身处理。 对于这些版本,有必要将工作(例如发送消息)移交给回调中的另一个线程。 这不再是必需的,因为框架现在将回调调用移交给执行器。executoramqp-client
只要返回回调在 60 秒或更短时间内执行,在确认之前接收返回消息的保证仍然保持不变。 确认计划在返回回调退出后或 60 秒后(以先到者为准)交付。

作用域内操作

通常,使用模板时,会从缓存中签出(或创建),用于操作,并返回到缓存中以供重用。 在多线程环境中,无法保证下一个操作使用相同的通道。 但是,有时您可能希望更好地控制通道的使用,并确保在同一通道上执行许多操作。ChannelSpring中文文档

从版本 2.0 开始,提供了一个名为 . 在回调范围内和提供的参数上执行的任何操作都使用相同的专用 ,该操作将在最后关闭(不会返回到缓存)。 如果通道是 ,则在收到所有确认后,它将返回到缓存中(请参阅相关发布者确认和返回)。invokeOperationsCallbackRabbitOperationsChannelPublisherCallbackChannelSpring中文文档

@FunctionalInterface
public interface OperationsCallback<T> {

    T doInRabbit(RabbitOperations operations);

}

您可能需要此操作的一个示例是,如果您希望在基础 . Spring API 以前没有公开此方法,因为如前所述,通道通常是缓存和共享的。 现在提供 和 ,它们委托给 范围内使用的专用通道。 出于显而易见的原因,这些方法不能在该范围之外使用。waitForConfirms()ChannelRabbitTemplatewaitForConfirms(long timeout)waitForConfirmsOrDie(long timeout)OperationsCallbackSpring中文文档

请注意,允许您将确认与请求相关联的更高级别的抽象在其他位置提供(请参阅关联发布者确认和返回)。 如果您只想等到代理确认交货,则可以使用以下示例中显示的技术:Spring中文文档

Collection<?> messages = getMessagesToSend();
Boolean result = this.template.invoke(t -> {
    messages.forEach(m -> t.convertAndSend(ROUTE, m));
    t.waitForConfirmsOrDie(10_000);
    return true;
});

如果希望在 的范围内在同一通道上调用操作,则必须使用用于操作的相同通道构造管理员。RabbitAdminOperationsCallbackRabbitTemplateinvokeSpring中文文档

如果模板操作已在现有事务范围内执行,例如,在事务处理侦听器容器线程上运行并在事务处理模板上执行操作时,则上述讨论没有实际意义。 在这种情况下,操作将在该通道上执行,并在线程返回到容器时提交。 在这种情况下没有必要使用。invoke

以这种方式使用确认时,实际上并不需要为将确认与请求相关联而设置的大部分基础结构(除非还启用了返回)。 从版本 2.2 开始,连接工厂支持名为 的新属性。 当此值设置为 时,可以避免基础结构,并且确认处理可以更高效。publisherConfirmTypeConfirmType.SIMPLESpring中文文档

此外,在发送的消息中设置属性。 如果要检查(或记录或以其他方式使用)特定确认,可以使用重载方法执行此操作,如以下示例所示:RabbitTemplatepublisherSequenceNumberMessagePropertiesinvokeSpring中文文档

public <T> T invoke(OperationsCallback<T> action, com.rabbitmq.client.ConfirmCallback acks,
        com.rabbitmq.client.ConfirmCallback nacks);
这些对象(for 和 instances)是 Rabbit 客户端回调,而不是模板回调。ConfirmCallbackacknack

以下示例日志和实例:acknackSpring中文文档

Collection<?> messages = getMessagesToSend();
Boolean result = this.template.invoke(t -> {
    messages.forEach(m -> t.convertAndSend(ROUTE, m));
    t.waitForConfirmsOrDie(10_000);
    return true;
}, (tag, multiple) -> {
        log.info("Ack: " + tag + ":" + multiple);
}, (tag, multiple) -> {
        log.info("Nack: " + tag + ":" + multiple);
}));
作用域内的操作绑定到线程。 有关多线程环境中的严格排序的讨论,请参阅多线程环境中的严格消息排序
如果模板操作已在现有事务范围内执行,例如,在事务处理侦听器容器线程上运行并在事务处理模板上执行操作时,则上述讨论没有实际意义。 在这种情况下,操作将在该通道上执行,并在线程返回到容器时提交。 在这种情况下没有必要使用。invoke
这些对象(for 和 instances)是 Rabbit 客户端回调,而不是模板回调。ConfirmCallbackacknack
作用域内的操作绑定到线程。 有关多线程环境中的严格排序的讨论,请参阅多线程环境中的严格消息排序

多线程环境中的严格消息排序

仅当在同一线程上执行操作时,作用域内操作中的讨论才适用。Spring中文文档

请考虑以下情况:Spring中文文档

由于 RabbitMQ 的异步特性和缓存通道的使用;不确定是否将使用相同的通道,因此无法保证消息到达队列的顺序。 (在大多数情况下,它们会按顺序到达,但无序交付的概率不为零)。 要解决此用例,您可以使用大小为 (与 一起)的有界通道缓存来确保消息始终在同一通道上发布,并且保证顺序。 为此,如果连接工厂有其他用途(如使用者),则应为模板使用专用连接工厂,或将模板配置为使用嵌入在主连接工厂中的发布者连接工厂(请参阅使用单独的连接)。1channelCheckoutTimeoutSpring中文文档

用一个简单的Spring Boot应用程序来最好地说明这一点:Spring中文文档

@SpringBootApplication
public class Application {

	private static final Logger log = LoggerFactory.getLogger(Application.class);

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

	@Bean
	TaskExecutor exec() {
		ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
		exec.setCorePoolSize(10);
		return exec;
	}

	@Bean
	CachingConnectionFactory ccf() {
		CachingConnectionFactory ccf = new CachingConnectionFactory("localhost");
		CachingConnectionFactory publisherCF = (CachingConnectionFactory) ccf.getPublisherConnectionFactory();
		publisherCF.setChannelCacheSize(1);
		publisherCF.setChannelCheckoutTimeout(1000L);
		return ccf;
	}

	@RabbitListener(queues = "queue")
	void listen(String in) {
		log.info(in);
	}

	@Bean
	Queue queue() {
		return new Queue("queue");
	}


	@Bean
	public ApplicationRunner runner(Service service, TaskExecutor exec) {
		return args -> {
			exec.execute(() -> service.mainService("test"));
		};
	}

}

@Component
class Service {

	private static final Logger LOG = LoggerFactory.getLogger(Service.class);

	private final RabbitTemplate template;

	private final TaskExecutor exec;

	Service(RabbitTemplate template, TaskExecutor exec) {
		template.setUsePublisherConnection(true);
		this.template = template;
		this.exec = exec;
	}

	void mainService(String toSend) {
		LOG.info("Publishing from main service");
		this.template.convertAndSend("queue", toSend);
		this.exec.execute(() -> secondaryService(toSend.toUpperCase()));
	}

	void secondaryService(String toSend) {
		LOG.info("Publishing from secondary service");
		this.template.convertAndSend("queue", toSend);
	}

}

即使发布是在两个不同的线程上执行的,它们也将使用相同的通道,因为缓存上限为单个通道。Spring中文文档

从版本 2.3.7 开始,支持使用 和 方法将线程的通道传输到另一个线程。 第一个方法返回一个上下文,该上下文传递给调用第二个方法的第二个线程。 线程可以绑定一个非事务性通道,也可以绑定一个事务性通道(或两者之一);除非使用两个连接工厂,否则无法单独传输它们。 示例如下:ThreadChannelConnectionFactoryprepareContextSwitchswitchContextSpring中文文档

@SpringBootApplication
public class Application {

	private static final Logger log = LoggerFactory.getLogger(Application.class);

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

	@Bean
	TaskExecutor exec() {
		ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
		exec.setCorePoolSize(10);
		return exec;
	}

	@Bean
	ThreadChannelConnectionFactory tccf() {
		ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
		rabbitConnectionFactory.setHost("localhost");
		return new ThreadChannelConnectionFactory(rabbitConnectionFactory);
	}

	@RabbitListener(queues = "queue")
	void listen(String in) {
		log.info(in);
	}

	@Bean
	Queue queue() {
		return new Queue("queue");
	}


	@Bean
	public ApplicationRunner runner(Service service, TaskExecutor exec) {
		return args -> {
			exec.execute(() -> service.mainService("test"));
		};
	}

}

@Component
class Service {

	private static final Logger LOG = LoggerFactory.getLogger(Service.class);

	private final RabbitTemplate template;

	private final TaskExecutor exec;

	private final ThreadChannelConnectionFactory connFactory;

	Service(RabbitTemplate template, TaskExecutor exec,
			ThreadChannelConnectionFactory tccf) {

		this.template = template;
		this.exec = exec;
		this.connFactory = tccf;
	}

	void mainService(String toSend) {
		LOG.info("Publishing from main service");
		this.template.convertAndSend("queue", toSend);
		Object context = this.connFactory.prepareSwitchContext();
		this.exec.execute(() -> secondaryService(toSend.toUpperCase(), context));
	}

	void secondaryService(String toSend, Object threadContext) {
		LOG.info("Publishing from secondary service");
		this.connFactory.switchContext(threadContext);
		this.template.convertAndSend("queue", toSend);
		this.connFactory.closeThreadChannel();
	}

}
调用后,如果当前线程执行任何其他操作,则将在新通道上执行这些操作。 当不再需要线程绑定通道时,关闭它很重要。prepareSwitchContext
调用后,如果当前线程执行任何其他操作,则将在新通道上执行这些操作。 当不再需要线程绑定通道时,关闭它很重要。prepareSwitchContext

消息传递集成

从版本 1.4 开始,(构建在 之上)提供了与 Spring Framework 消息传递抽象的集成,即 . 这允许您使用抽象发送和接收消息。 其他 Spring 项目(如 Spring Integration 和 Spring 的 STOMP 支持)也使用此抽象。 涉及两个消息转换器:一个用于在 spring-messaging 和 Spring AMQP 的抽象之间进行转换,另一个用于在 Spring AMQP 的抽象和底层 RabbitMQ 客户端库所需的格式之间进行转换。 默认情况下,消息负载由提供的实例的消息转换器转换。 或者,您可以将自定义注入一些其他有效负载转换器,如以下示例所示:RabbitMessagingTemplateRabbitTemplateorg.springframework.messaging.Messagespring-messagingMessage<?>Message<?>MessageMessageRabbitTemplateMessagingMessageConverterSpring中文文档

MessagingMessageConverter amqpMessageConverter = new MessagingMessageConverter();
amqpMessageConverter.setPayloadConverter(myPayloadConverter);
rabbitMessagingTemplate.setAmqpMessageConverter(amqpMessageConverter);

已验证的用户 ID

从版本 1.6 开始,模板现在支持(使用 Java 配置时)。 如果发送了消息,则在计算此表达式后设置用户 ID 属性(如果尚未设置)。 评估的根对象是要发送的消息。user-id-expressionuserIdExpressionSpring中文文档

以下示例演示如何使用该属性:user-id-expressionSpring中文文档

<rabbit:template ... user-id-expression="'guest'" />

<rabbit:template ... user-id-expression="@myConnectionFactory.username" />

第一个示例是文字表达式。 第二个从应用程序上下文中的连接工厂 Bean 获取属性。usernameSpring中文文档

使用单独的连接

从版本 2.0.2 开始,可以将该属性设置为尽可能使用与侦听器容器使用的连接不同的连接。 这是为了避免消费者在生产者因任何原因被阻止时被阻止。 为此,连接工厂维护了第二个内部连接工厂;默认情况下,它与主工厂的类型相同,但如果希望使用不同的工厂类型进行发布,则可以显式设置。 如果 rabbit 模板在侦听器容器启动的事务中运行,则无论此设置如何,都将使用容器的通道。usePublisherConnectiontrueSpring中文文档

通常,不应将 a 与此设置为 的模板一起使用 。 使用采用连接工厂的构造函数。 如果使用采用模板的其他构造函数,请确保模板的属性为 。 这是因为,管理员通常用于声明侦听器容器的队列。 使用属性设置为的模板意味着将在与侦听器容器使用的连接不同的连接上声明独占队列(如 )。 在这种情况下,容器不能使用队列。RabbitAdmintrueRabbitAdminfalsetrueAnonymousQueue
通常,不应将 a 与此设置为 的模板一起使用 。 使用采用连接工厂的构造函数。 如果使用采用模板的其他构造函数,请确保模板的属性为 。 这是因为,管理员通常用于声明侦听器容器的队列。 使用属性设置为的模板意味着将在与侦听器容器使用的连接不同的连接上声明独占队列(如 )。 在这种情况下,容器不能使用队列。RabbitAdmintrueRabbitAdminfalsetrueAnonymousQueue