4. 参考资料
4.1. 使用 Spring AMQP
本章探讨了使用 Spring AMQP 开发应用程序的基本组件的接口和类。
4.1.1. AMQP 抽象
Spring AMQP 由两个模块组成(每个模块在发行版中由一个 JAR 表示):spring-amqp
和spring-rabbit
.
'spring-amqp'模块包含org.springframework.amqp.core
包。
在该包中,您可以找到表示核心 AMQP“模型”的类。
我们的目的是提供不依赖于任何特定 AMQP 代理实现或客户端库的通用抽象。
最终用户代码可以更易于跨提供商实现,因为它只能针对抽象层进行开发。
然后,这些抽象由特定于代理的模块实现,例如“spring-rabbit”。
目前只有一个 RabbitMQ 实现。
但是,除了 RabbitMQ 之外,还使用 Apache Qpid 在 .NET 中验证了抽象。
由于 AMQP 在协议层面运行,原则上你可以将 RabbitMQ 客户端与任何支持相同协议版本的 Broker 一起使用,但我们目前不测试任何其他 Broker。
本概述假定您已经熟悉 AMQP 规范的基础知识。 如果没有,请查看其他资源中列出的资源
Message
0-9-1 AMQP 规范未定义Message
类或接口。
相反,当执行basicPublish()
,内容作为字节数组参数传递,其他属性作为单独的参数传递。
Spring AMQP 定义了一个Message
类作为更通用的 AMQP 域模型表示形式的一部分。
的目的Message
class 是将主体和属性封装在单个实例中,以便 API 反过来可以更简单。
以下示例显示了Message
类定义:
public class Message {
private final MessageProperties messageProperties;
private final byte[] body;
public Message(byte[] body, MessageProperties messageProperties) {
this.body = body;
this.messageProperties = messageProperties;
}
public byte[] getBody() {
return this.body;
}
public MessageProperties getMessageProperties() {
return this.messageProperties;
}
}
这MessageProperties
interface 定义了几个常见属性,例如 'messageId'、'timestamp'、'contentType' 等。
您还可以通过调用setHeader(String key, Object value)
方法。
从版本开始1.5.7 ,1.6.11 ,1.7.4 和2.0.0 ,如果消息正文是序列化的Serializable java 对象,则在执行toString() 作(例如在日志消息中)。
这是为了防止不安全的反序列化。
默认情况下,只有java.util 和java.lang 类被反序列化。
要恢复到以前的行为,您可以通过调用Message.addAllowedListPatterns(…) .
一个简单的 支持通配符,例如com.something. , *.MyClass .
无法反序列化的主体表示为byte[<size>] 在日志消息中。 |
交换
这Exchange
接口表示 AMQP 交换,这是消息生产者发送到的交换。
代理虚拟主机中的每个 Exchange 都有一个唯一的名称以及一些其他属性。
以下示例显示了Exchange
接口:
public interface Exchange {
String getName();
String getExchangeType();
boolean isDurable();
boolean isAutoDelete();
Map<String, Object> getArguments();
}
如您所见,一个Exchange
也有一个“类型”,由定义在ExchangeTypes
.
基本类型包括:direct
,topic
,fanout
和headers
.
在核心包中,您可以找到Exchange
接口。
这些行为各不相同Exchange
类型如何处理与队列的绑定。
例如,一个Direct
Exchange 允许队列由固定路由键(通常是队列的名称)绑定。
一个Topic
Exchange 支持具有路由模式的绑定,这些模式可能分别包括“恰好一”和“零或更多”的“*”和“#”通配符。
这Fanout
Exchange 发布到绑定到它的所有队列,而不考虑任何路由密钥。
有关这些和其他 Exchange 类型的详细信息,请参阅其他资源。
AMQP 规范还要求任何代理提供没有名称的“默认”直接交换。
所有声明的队列都绑定到该默认值Exchange 使用它们的名称作为路由键。
您可以在 Spring AMQP 中了解有关默认 Exchange 用法的更多信息,请访问AmqpTemplate . |
队列
这Queue
class 表示消息使用者从中接收消息的组件。
像各种Exchange
classes,我们的实现旨在作为此核心 AMQP 类型的抽象表示。
以下列表显示了Queue
类:
public class Queue {
private final String name;
private volatile boolean durable;
private volatile boolean exclusive;
private volatile boolean autoDelete;
private volatile Map<String, Object> arguments;
/**
* The queue is durable, non-exclusive and non auto-delete.
*
* @param name the name of the queue.
*/
public Queue(String name) {
this(name, true, false, false);
}
// Getters and Setters omitted for brevity
}
请注意,构造函数采用队列名称。 根据实现,管理模板可能会提供用于生成唯一命名队列的方法。 此类队列可用作“回复”地址或其他临时情况。 因此,自动生成队列的“exclusive”和“autoDelete”属性都将设置为“true”。
有关使用命名空间支持声明队列的信息,请参阅配置代理中有关队列的部分,包括队列参数。 |
捆绑
鉴于生产者向交换发送,而使用者从队列接收,将队列连接到交换的绑定对于通过消息传递连接这些生产者和使用者至关重要。
在 Spring AMQP 中,我们定义了一个Binding
类来表示这些连接。
本节回顾了将队列绑定到交换的基本选项。
您可以将队列绑定到DirectExchange
使用固定路由键,如以下示例所示:
new Binding(someQueue, someDirectExchange, "foo.bar");
您可以将队列绑定到TopicExchange
使用路由模式,如以下示例所示:
new Binding(someQueue, someTopicExchange, "foo.*");
您可以将队列绑定到FanoutExchange
没有路由密钥,如以下示例所示:
new Binding(someQueue, someFanoutExchange);
我们还提供BindingBuilder
以促进“流畅的 API”样式,如以下示例所示:
Binding b = BindingBuilder.bind(someQueue).to(someTopicExchange).with("foo.*");
为清楚起见,前面的示例显示了BindingBuilder 类,但这种样式在对 'bind()' 方法使用静态导入时效果很好。 |
就其本身而言,一个实例Binding
class 仅保存有关连接的数据。
换句话说,它不是一个“主动”组件。
但是,正如您稍后将在配置代理中看到的那样,AmqpAdmin
类可以使用Binding
实例以实际触发代理上的绑定作。
此外,正如您在同一部分中看到的,您可以定义Binding
实例,使用 Spring 的@Bean
注释@Configuration
类。
还有一个方便的基类,它进一步简化了生成与 AMQP 相关的 bean 定义的方法,并识别队列、交换和绑定,以便在应用程序启动时在 AMQP 代理上声明它们。
这AmqpTemplate
也在核心包中定义。
作为实际 AMQP 消息传递中涉及的主要组件之一,它在自己的部分中进行了详细讨论(请参阅AmqpTemplate
).
4.1.2. 连接和资源管理
虽然我们在上一节中描述的 AMQP 模型是通用的,适用于所有实现,但当我们进入资源管理时,详细信息特定于代理实现。 因此,在本节中,我们将重点介绍仅存在于“spring-rabbit”模块中的代码,因为此时,RabbitMQ 是唯一受支持的实现。
管理与 RabbitMQ 代理的连接的核心组件是ConnectionFactory
接口。
的责任ConnectionFactory
实现是提供org.springframework.amqp.rabbit.connection.Connection
,它是com.rabbitmq.client.Connection
.
选择连接工厂
有三个连接工厂可供选择
-
PooledChannelConnectionFactory
-
ThreadChannelConnectionFactory
-
CachingConnectionFactory
前两个是在 2.3 版本中添加的。
对于大多数用例,CachingConnectionFactory
应该使用。
这ThreadChannelConnectionFactory
如果要确保严格的消息排序而无需使用作用域作,则可以使用。
这PooledChannelConnectionFactory
类似于CachingConnectionFactory
因为它使用单个连接和一个通道池。它的实现更简单,但不支持相关的发布者确认。
所有三个工厂都支持简单的发布者确认。
配置RabbitTemplate
若要使用单独的连接,现在可以从版本 2.3.2 开始,将发布连接工厂配置为不同的类型。默认情况下,发布工厂是相同的类型,并且在主工厂上设置的任何属性也会传播到发布工厂。
PooledChannelConnectionFactory
该工厂基于 Apache Pool2 管理单个连接和两个通道池。一个池用于事务通道,另一个用于非事务通道。这些池是GenericObjectPool
s 具有默认配置;提供回调来配置池;有关更多信息,请参阅 Apache 文档。
阿帕奇commons-pool2
jar 必须位于类路径上才能使用此工厂。
@Bean
PooledChannelConnectionFactory pcf() throws Exception {
ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
rabbitConnectionFactory.setHost("localhost");
PooledChannelConnectionFactory pcf = new PooledChannelConnectionFactory(rabbitConnectionFactory);
pcf.setPoolConfigurer((pool, tx) -> {
if (tx) {
// configure the transactional pool
}
else {
// configure the non-transactional pool
}
});
return pcf;
}
ThreadChannelConnectionFactory
该工厂管理一个连接和两个ThreadLocal
s,一个用于事务性通道,另一个用于非事务性通道。
该工厂确保同一线程上的所有作都使用相同的通道(只要它保持打开状态)。
这有助于严格的消息排序,而无需作用域作。
为避免内存泄漏,如果您的应用程序使用许多短期线程,则必须调用工厂的closeThreadChannel()
以释放通道资源。
从 2.3.7 版开始,一个线程可以将其通道转移到另一个线程。
有关更多信息,请参阅多线程环境中的严格消息排序。
CachingConnectionFactory
提供的第三个实现是CachingConnectionFactory
,默认情况下,它建立一个可以由应用程序共享的单个连接代理。共享连接是可能的,因为使用 AMQP 进行消息传递的“工作单元”实际上是一个“通道”(在某些方面,这类似于 JMS 中连接和会话之间的关系)。连接实例提供了一个createChannel
方法。 这CachingConnectionFactory
实现支持缓存这些通道,并根据通道是否为事务性通道维护单独的缓存。创建CachingConnectionFactory
,您可以通过构造函数提供“hostname”。您还应该提供“username”和“password”属性。要配置通道缓存的大小(默认值为 25),您可以调用setChannelCacheSize()
方法。
从 1.3 版开始,您可以配置CachingConnectionFactory
以缓存连接以及仅缓存通道。在这种情况下,每次调用createConnection()
创建一个新连接(或从缓存中检索空闲连接)。关闭连接会将其返回到缓存(如果尚未达到缓存大小)。在此类连接上创建的通道也会被缓存。在某些环境中,使用单独的连接可能很有用,例如从 HA 集群使用,在与负载均衡器结合使用,连接到不同的集群成员等。要缓存连接,请将cacheMode
自CacheMode.CONNECTION
.
这不限制连接数。相反,它指定允许多少个空闲打开的连接。 |
从 1.5.5 版本开始,一个名为connectionLimit
。设置此属性后,它会限制允许的连接总数。设置后,如果达到限制,则channelCheckoutTimeLimit
用于等待连接空闲。如果超过时间,则AmqpTimeoutException
被抛出。
当缓存模式为 此外,在撰写本文时, |
重要的是要了解缓存大小(默认情况下)不是限制,而只是可以缓存的通道数。缓存大小为 10 时,实际上可以使用任意数量的通道。如果使用超过 10 个通道并且它们都返回到缓存中,则 10 个通道进入缓存。其余部分物理关闭。
从 1.6 版开始,默认通道缓存大小已从 1 增加到 25。在高容量、多线程环境中,较小的缓存意味着通道的创建和关闭速度很高。增加默认缓存大小可以避免这种开销。您应该通过 RabbitMQ 管理 UI 监控正在使用的通道,如果您看到许多通道正在创建和关闭。缓存仅按需增长(以满足应用程序的并发要求),因此此更改不会影响现有的低容量应用程序。
从 1.4.2 版本开始,CachingConnectionFactory
有一个名为channelCheckoutTimeout
. 当此属性大于零时,channelCacheSize
成为对可以在连接上创建的通道数的限制。如果达到限制,则调用线程将阻塞,直到通道可用或达到此超时,在这种情况下,AmqpTimeoutException
被抛出。
框架内使用的通道(例如RabbitTemplate )可靠地返回到缓存。如果您在框架外部创建通道(例如,通过直接访问连接并调用createChannel() ),您必须可靠地返回它们(通过关闭),也许在finally 块,以避免通道耗尽。 |
以下示例演示如何创建新的connection
:
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("somehost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.createConnection();
使用 XML 时,配置可能类似于以下示例:
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
</bean>
还有一个SingleConnectionFactory 仅在框架的单元测试代码中可用的实现。它比CachingConnectionFactory ,因为它不缓存通道,但由于缺乏性能和弹性,它不适合简单测试之外的实际使用。如果您需要实现自己的ConnectionFactory 出于某种原因,AbstractConnectionFactory 基类可能提供一个很好的起点。 |
一个ConnectionFactory
可以使用 rabbit 命名空间快速方便地创建,如下所示:
<rabbit:connection-factory id="connectionFactory"/>
在大多数情况下,这种方法更可取,因为框架可以为您选择最佳默认值。
创建的实例是CachingConnectionFactory
.
请记住,通道的默认缓存大小为 25。
如果要缓存更多通道,请通过设置 'channelCacheSize' 属性来设置更大的值。
在 XML 中,它看起来如下:
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
<property name="channelCacheSize" value="50"/>
</bean>
此外,使用命名空间,您可以添加 'channel-cache-size' 属性,如下所示:
<rabbit:connection-factory
id="connectionFactory" channel-cache-size="50"/>
默认缓存模式为CHANNEL
,但您可以将其配置为缓存连接。
在下面的示例中,我们使用connection-cache-size
:
<rabbit:connection-factory
id="connectionFactory" cache-mode="CONNECTION" connection-cache-size="25"/>
您可以使用命名空间提供主机和端口属性,如下所示:
<rabbit:connection-factory
id="connectionFactory" host="somehost" port="5672"/>
或者,如果在集群环境中运行,则可以使用 addresses 属性,如下所示:
<rabbit:connection-factory
id="connectionFactory" addresses="host1:5672,host2:5672" address-shuffle-mode="RANDOM"/>
有关以下信息,请参阅连接到集群address-shuffle-mode
.
以下示例包含一个自定义线程工厂,该工厂在线程名称前加上rabbitmq-
:
<rabbit:connection-factory id="multiHost" virtual-host="/bar" addresses="host1:1234,host2,host3:4567"
thread-factory="tf"
channel-cache-size="10" username="user" password="password" />
<bean id="tf" class="org.springframework.scheduling.concurrent.CustomizableThreadFactory">
<constructor-arg value="rabbitmq-" />
</bean>
命名连接
从 1.7 版开始,一个ConnectionNameStrategy
提供注入AbstractionConnectionFactory
.
生成的名称用于目标 RabbitMQ 连接的特定于应用程序的标识。
如果 RabbitMQ 服务器支持连接名称,则会在管理 UI 中显示连接名称。
此值不必是唯一的,也不能用作连接标识符,例如,在 HTTP API 请求中。
这个值应该是人类可读的,并且是ClientProperties
在connection_name
钥匙。
您可以使用简单的 Lambda,如下所示:
connectionFactory.setConnectionNameStrategy(connectionFactory -> "MY_CONNECTION");
这ConnectionFactory
参数可用于通过某些逻辑区分目标连接名称。
默认情况下,beanName
的AbstractConnectionFactory
、表示对象的十六进制字符串和内部计数器用于生成connection_name
.
这<rabbit:connection-factory>
命名空间组件也随connection-name-strategy
属性。
实现SimplePropertyValueConnectionNameStrategy
将连接名称设置为应用程序属性。
您可以将其声明为@Bean
并将其注入到连接工厂中,如以下示例所示:
@Bean
public SimplePropertyValueConnectionNameStrategy cns() {
return new SimplePropertyValueConnectionNameStrategy("spring.application.name");
}
@Bean
public ConnectionFactory rabbitConnectionFactory(ConnectionNameStrategy cns) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
...
connectionFactory.setConnectionNameStrategy(cns);
return connectionFactory;
}
该属性必须存在于应用程序上下文的Environment
.
使用 Spring Boot 及其自动配置的连接工厂时,只需声明ConnectionNameStrategy @Bean .
Boot 自动检测 bean 并将其连接到工厂。 |
阻止的连接和资源约束
对于与内存警报相对应的代理的交互,可能会阻止连接。
从 2.0 版开始,org.springframework.amqp.rabbit.connection.Connection
可提供com.rabbitmq.client.BlockedListener
要通知连接阻止和取消阻止事件的实例。
此外,AbstractConnectionFactory
发出ConnectionBlockedEvent
和ConnectionUnblockedEvent
,分别通过其内部BlockedListener
实现。
这些允许您提供应用程序逻辑以对代理上的问题做出适当的反应,并(例如)采取一些纠正措施。
当应用程序配置了单个CachingConnectionFactory ,就像默认情况下使用 Spring Boot 自动配置一样,当连接被 Broker 阻止时,应用程序将停止工作。
当它被经纪人阻止时,它的任何客户端都会停止工作。
如果我们在同一个应用程序中有生产者和消费者,那么当生产者阻止连接(因为 Broker 上不再有资源)并且消费者无法释放它们(因为连接被阻止)时,我们最终可能会陷入死锁。
为了缓解这个问题,我们建议再有一个单独的CachingConnectionFactory 实例具有相同的选项 - 一个用于生产者,一个用于消费者。一个单独的CachingConnectionFactory 对于在使用者线程上执行的事务生产者来说是不可能的,因为它们应该重用Channel 与消费者事务相关联。 |
从 2.0.2 版本开始,RabbitTemplate
具有自动使用第二个连接工厂的配置选项,除非正在使用事务。有关更多信息,请参阅使用单独的连接。 这ConnectionNameStrategy
对于发布者,连接与主策略相同,使用.publisher
附加到调用方法的结果。
从 1.7.7 版本开始,AmqpResourceNotAvailableException
提供,当SimpleConnection.createChannel()
无法创建Channel
(例如,因为channelMax
达到限制,并且缓存中没有可用通道)。您可以在RetryPolicy
在一些回退后恢复作。
配置底层客户端连接工厂
这CachingConnectionFactory
使用 Rabbit 客户端的实例ConnectionFactory
. 许多配置属性将传递 (host
,port
,userName
,password
,requestedHeartBeat
和connectionTimeout
例如)在CachingConnectionFactory
. 要设置其他属性 (clientProperties
,例如),您可以定义 Rabbit 工厂的实例,并使用CachingConnectionFactory
. 使用命名空间时(如前所述),您需要在connection-factory
属性。 为方便起见,提供了一个工厂 Bean 来帮助在 Spring 应用程序上下文中配置连接工厂,如下一节所述。
<rabbit:connection-factory
id="connectionFactory" connection-factory="rabbitConnectionFactory"/>
4.0.x 客户端默认启用自动恢复。虽然与此功能兼容,但 Spring AMQP 有自己的恢复机制,通常不需要客户端恢复功能。我们建议禁用amqp-client 自动恢复,避免获得AutoRecoverConnectionNotCurrentlyOpenException 代理可用但连接尚未恢复时。您可能会注意到此异常,例如,当RetryTemplate 在RabbitTemplate ,即使在故障转移到集群中的另一个代理时也是如此。由于自动恢复连接在计时器上恢复,因此使用 Spring AMQP 的恢复机制可以更快地恢复连接。从版本 1.7.1 开始,Spring AMQP 禁用amqp-client 自动恢复,除非您显式创建自己的 RabbitMQ 连接工厂并将其提供给CachingConnectionFactory .
兔子MQConnectionFactory 由RabbitConnectionFactoryBean 默认情况下,还禁用该选项。 |
RabbitConnectionFactoryBean
和配置 SSL
从 1.4 版本开始,一个方便的RabbitConnectionFactoryBean
是为了使用依赖注入在基础客户端连接工厂上方便地配置 SSL 属性。其他 setter 委托给底层工厂。以前,您必须以编程方式配置 SSL 选项。以下示例演示如何配置RabbitConnectionFactoryBean
:
@Bean
RabbitConnectionFactoryBean rabbitConnectionFactory() {
RabbitConnectionFactoryBean factoryBean = new RabbitConnectionFactoryBean();
factoryBean.setUseSSL(true);
factoryBean.setSslPropertiesLocation(new ClassPathResource("secrets/rabbitSSL.properties"));
return factoryBean;
}
@Bean
CachingConnectionFactory connectionFactory(ConnectionFactory rabbitConnectionFactory) {
CachingConnectionFactory ccf = new CachingConnectionFactory(rabbitConnectionFactory);
ccf.setHost("...");
// ...
return ccf;
}
spring.rabbitmq.ssl.enabled:true
spring.rabbitmq.ssl.keyStore=...
spring.rabbitmq.ssl.keyStoreType=jks
spring.rabbitmq.ssl.keyStorePassword=...
spring.rabbitmq.ssl.trustStore=...
spring.rabbitmq.ssl.trustStoreType=jks
spring.rabbitmq.ssl.trustStorePassword=...
spring.rabbitmq.host=...
...
<rabbit:connection-factory id="rabbitConnectionFactory"
connection-factory="clientConnectionFactory"
host="${host}"
port="${port}"
virtual-host="${vhost}"
username="${username}" password="${password}" />
<bean id="clientConnectionFactory"
class="org.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean">
<property name="useSSL" value="true" />
<property name="sslPropertiesLocation" value="classpath:secrets/rabbitSSL.properties"/>
</bean>
有关配置 SSL 的信息,请参阅 RabbitMQ 文档。省略keyStore
和trustStore
配置为无需证书验证即可通过 SSL 进行连接。
下一个示例演示如何提供密钥和信任存储配置。
这sslPropertiesLocation
属性是弹簧Resource
指向包含以下键的属性文件:
keyStore=file:/secret/keycert.p12
trustStore=file:/secret/trustStore
keyStore.passPhrase=secret
trustStore.passPhrase=secret
这keyStore
和truststore
是SpringResources
指向商店。
通常,此属性文件由具有读取访问权限的应用程序的作系统保护。
从 Spring AMQP 版本 1.5 开始,您可以直接在工厂 Bean 上设置这些属性。
如果离散属性和sslPropertiesLocation
,后者的属性会覆盖
离散值。
从 2.0 版开始,默认情况下会验证服务器证书,因为它更安全。
如果您出于某种原因希望跳过此验证,请将工厂 Bean 的skipServerCertificateValidation 属性设置为true .
从 2.1 版开始,RabbitConnectionFactoryBean 现在调用enableHostnameVerification() 默认情况下。
要恢复到以前的行为,请将enableHostnameVerification 属性设置为false . |
从 2.2.5 版开始,默认情况下,工厂 Bean 将始终使用 TLS v1.2;以前,它在某些情况下使用 v1.1,在其他情况下使用 v1.2(取决于其他属性)。
如果出于某种原因需要使用 v1.1,请将sslAlgorithm 财产:setSslAlgorithm("TLSv1.1") . |
连接到集群
要连接到集群,请配置addresses
属性CachingConnectionFactory
:
@Bean
public CachingConnectionFactory ccf() {
CachingConnectionFactory ccf = new CachingConnectionFactory();
ccf.setAddresses("host1:5672,host2:5672,host3:5672");
return ccf;
}
从 3.0 版开始,每当建立新连接时,底层连接工厂将尝试通过选择随机地址来连接到主机。
要恢复到之前尝试从第一个到最后一个连接的行为,请将addressShuffleMode
属性设置为AddressShuffleMode.NONE
.
从 2.3 版本开始,INORDER
添加了随机播放模式,这意味着在创建连接后,第一个地址将移动到末尾。
您可能希望将此模式与 RabbitMQ 分片插件一起使用,并使用CacheMode.CONNECTION
如果您希望从所有节点上的所有分片中消费,则可以使用合适的并发性。
@Bean
public CachingConnectionFactory ccf() {
CachingConnectionFactory ccf = new CachingConnectionFactory();
ccf.setAddresses("host1:5672,host2:5672,host3:5672");
ccf.setAddressShuffleMode(AddressShuffleMode.INORDER);
return ccf;
}
工艺路线连接工厂
从 1.3 版本开始,AbstractRoutingConnectionFactory
已被引入。
该工厂提供了一种机制来配置多个映射ConnectionFactories
并确定目标ConnectionFactory
由一些人lookupKey
在运行时。
通常,实现会检查线程绑定的上下文。
为方便起见,Spring AMQP 提供了SimpleRoutingConnectionFactory
,它获取当前线程绑定的lookupKey
从SimpleResourceHolder
.
以下示例演示如何配置SimpleRoutingConnectionFactory
在 XML 和 Java 中:
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory">
<property name="targetConnectionFactories">
<map>
<entry key="#{connectionFactory1.virtualHost}" ref="connectionFactory1"/>
<entry key="#{connectionFactory2.virtualHost}" ref="connectionFactory2"/>
</map>
</property>
</bean>
<rabbit:template id="template" connection-factory="connectionFactory" />
public class MyService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void service(String vHost, String payload) {
SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(), vHost);
rabbitTemplate.convertAndSend(payload);
SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());
}
}
使用后解绑资源很重要。
有关更多信息,请参阅 JavaDocAbstractRoutingConnectionFactory
.
从 1.4 版本开始,RabbitTemplate
支持 SpELsendConnectionFactorySelectorExpression
和receiveConnectionFactorySelectorExpression
属性,这些属性在每个 AMQP 协议交互作 (send
,sendAndReceive
,receive
或receiveAndReply
),解析为lookupKey
提供的值AbstractRoutingConnectionFactory
.
您可以使用 bean 引用,例如@vHostResolver.getVHost(#root)
在表达式中。
为send
作时,要发送的消息是根求值对象。
为receive
作,queueName
是根评估对象。
路由算法如下:如果选择器表达式为null
或被评估为null
或提供的ConnectionFactory
不是AbstractRoutingConnectionFactory
,一切都像以前一样工作,依赖于提供的ConnectionFactory
实现。
如果评估结果不是null
,但没有目标ConnectionFactory
为此lookupKey
和AbstractRoutingConnectionFactory
配置为lenientFallback = true
.
在AbstractRoutingConnectionFactory
,它确实回退到其routing
基于determineCurrentLookupKey()
.
但是,如果lenientFallback = false
一IllegalStateException
被抛出。
命名空间支持还提供send-connection-factory-selector-expression
和receive-connection-factory-selector-expression
属性<rabbit:template>
元件。
此外,从版本 1.4 开始,您可以在侦听器容器中配置路由连接工厂。
在这种情况下,队列名称列表将用作查找键。
例如,如果将容器配置为setQueueNames("thing1", "thing2")
,查找键是[thing1,thing]"
(请注意,键中没有空格)。
从版本 1.6.9 开始,您可以使用以下命令向查找键添加限定符setLookupKeyQualifier
在侦听器容器上。
例如,这样做可以侦听具有相同名称但在不同虚拟主机中的队列(每个虚拟主机都有一个连接工厂)。
例如,使用 lookup 键限定符thing1
以及一个监听队列的容器thing2
,您可以注册目标连接工厂的查找键可以是thing1[thing2]
.
目标(和默认值,如果提供)连接工厂必须具有相同的发布者确认和返回设置。 请参阅发布商确认和退货。 |
从版本 2.4.4 开始,可以禁用此验证。如果您遇到确认和返回之间的值需要不相等的情况,您可以使用AbstractRoutingConnectionFactory#setConsistentConfirmsReturns
以转验证。请注意,第一个连接工厂添加到AbstractRoutingConnectionFactory
将确定confirms
和returns
.
如果您有需要检查的某些消息确认/返回而其他消息则不需要,这可能会很有用。 例如:
@Bean
public RabbitTemplate rabbitTemplate() {
final com.rabbitmq.client.ConnectionFactory cf = new com.rabbitmq.client.ConnectionFactory();
cf.setHost("localhost");
cf.setPort(5672);
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(cf);
cachingConnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
PooledChannelConnectionFactory pooledChannelConnectionFactory = new PooledChannelConnectionFactory(cf);
final Map<Object, ConnectionFactory> connectionFactoryMap = new HashMap<>(2);
connectionFactoryMap.put("true", cachingConnectionFactory);
connectionFactoryMap.put("false", pooledChannelConnectionFactory);
final AbstractRoutingConnectionFactory routingConnectionFactory = new SimpleRoutingConnectionFactory();
routingConnectionFactory.setConsistentConfirmsReturns(false);
routingConnectionFactory.setDefaultTargetConnectionFactory(pooledChannelConnectionFactory);
routingConnectionFactory.setTargetConnectionFactories(connectionFactoryMap);
final RabbitTemplate rabbitTemplate = new RabbitTemplate(routingConnectionFactory);
final Expression sendExpression = new SpelExpressionParser().parseExpression(
"messageProperties.headers['x-use-publisher-confirms'] ?: false");
rabbitTemplate.setSendConnectionFactorySelectorExpression(sendExpression);
}
这样,带有x-use-publisher-confirms: true
将通过缓存连接发送,您可以确保邮件传递。有关确保邮件传递的更多信息,请参阅发布者确认和返回。
队列关联和LocalizedQueueConnectionFactory
在集群中使用 HA 队列时,为了获得最佳性能,您可能需要连接到主要队列所在的物理代理。 这CachingConnectionFactory
可以配置多个代理地址。这是为了故障转移,客户端尝试根据配置的AddressShuffleMode
次序。 这LocalizedQueueConnectionFactory
使用管理插件提供的 REST API 来确定哪个节点是队列的前导。然后它创建(或从缓存中检索)一个CachingConnectionFactory
仅连接到该节点。如果连接失败,则确定新的主节点,使用者连接到该节点。 这LocalizedQueueConnectionFactory
配置了默认的连接工厂,以防无法确定队列的物理位置,在这种情况下,它会正常连接到集群。
这LocalizedQueueConnectionFactory
是一个RoutingConnectionFactory
和SimpleMessageListenerContainer
使用队列名称作为查找键,如上面的路由连接工厂中所述。
出于这个原因(使用队列名称进行查找),该LocalizedQueueConnectionFactory 仅当容器配置为侦听单个队列时才能使用。 |
必须在每个节点上启用 RabbitMQ 管理插件。 |
此连接工厂适用于长期连接,例如SimpleMessageListenerContainer . 它不适用于短连接使用,例如与RabbitTemplate 因为在建立连接之前调用 REST API 的开销。此外,对于发布作,队列是未知的,并且消息无论如何都会发布到所有集群成员,因此查找节点的逻辑几乎没有价值。 |
以下示例配置显示了如何配置工厂:
@Autowired
private ConfigurationProperties props;
@Bean
public CachingConnectionFactory defaultConnectionFactory() {
CachingConnectionFactory cf = new CachingConnectionFactory();
cf.setAddresses(this.props.getAddresses());
cf.setUsername(this.props.getUsername());
cf.setPassword(this.props.getPassword());
cf.setVirtualHost(this.props.getVirtualHost());
return cf;
}
@Bean
public LocalizedQueueConnectionFactory queueAffinityCF(
@Qualifier("defaultConnectionFactory") ConnectionFactory defaultCF) {
return new LocalizedQueueConnectionFactory(defaultCF,
StringUtils.commaDelimitedListToStringArray(this.props.getAddresses()),
StringUtils.commaDelimitedListToStringArray(this.props.getAdminUris()),
StringUtils.commaDelimitedListToStringArray(this.props.getNodes()),
this.props.getVirtualHost(), this.props.getUsername(), this.props.getPassword(),
false, null);
}
请注意,前三个参数是addresses
,adminUris
和nodes
.
这些是位置性的,因为当容器尝试连接到队列时,它使用管理 API 来确定哪个节点是队列的前导,并连接到与该节点位于同一数组位置的地址。
从 3.0 版本开始,RabbitMQhttp-client 不再用于访问 Rest API。
相反,默认情况下,WebClient 如果spring-webflux 在类路径上;否则RestTemplate 被使用。 |
添加WebFlux
到类路径:
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>
compile 'org.springframework.amqp:spring-rabbit'
您还可以通过实现LocalizedQueueConnectionFactory.NodeLocator
并覆盖其createClient, ``restCall
,以及可选的close
方法。
lqcf.setNodeLocator(new NodeLocator<MyClient>() {
@Override
public MyClient createClient(String userName, String password) {
...
}
@Override
public HashMap<String, Object> restCall(MyClient client, URI uri) {
...
});
});
该框架提供了WebFluxNodeLocator
和RestTemplateNodeLocator
,默认值如上所述。
出版商确认并退货
通过设置CachingConnectionFactory
属性publisherConfirmType
自ConfirmType.CORRELATED
和publisherReturns
属性设置为“true”。
设置这些选项后,Channel
工厂创建的实例包装在PublisherCallbackChannel
,用于促进回调。
当获得这样的通道时,客户端可以注册一个PublisherCallbackChannel.Listener
使用Channel
.
这PublisherCallbackChannel
实现包含将确认或返回路由到相应侦听器的逻辑。
以下部分将进一步解释这些功能。
另请参阅相关发布者确认和退货和simplePublisherConfirms
在作用域作中。
有关更多背景信息,请参阅 RabbitMQ 团队的博客文章,标题为 Introducing Publisher Confirms。 |
连接和通道侦听器
连接工厂支持注册ConnectionListener
和ChannelListener
实现。
这允许您接收有关连接和频道相关事件的通知。
(一个ConnectionListener
由RabbitAdmin
在建立连接时执行声明 - 有关详细信息,请参阅自动声明交换、队列和绑定)。
以下列表显示了ConnectionListener
接口定义:
@FunctionalInterface
public interface ConnectionListener {
void onCreate(Connection connection);
default void onClose(Connection connection) {
}
default void onShutDown(ShutdownSignalException signal) {
}
}
从 2.0 版开始,org.springframework.amqp.rabbit.connection.Connection
对象可以提供com.rabbitmq.client.BlockedListener
要通知连接阻止和取消阻止事件的实例。
以下示例显示了 ChannelListener 接口定义:
@FunctionalInterface
public interface ChannelListener {
void onCreate(Channel channel, boolean transactional);
default void onShutDown(ShutdownSignalException signal) {
}
}
请参阅发布是异步的 — 如何检测成功和失败,了解您可能想要注册ChannelListener
.
记录通道关闭事件
版本 1.5 引入了一种机制,使用户能够控制日志记录级别。
这CachingConnectionFactory
使用默认策略来记录通道关闭,如下所示:
-
不记录正常通道关闭 (200 OK)。
-
如果通道由于被动队列声明失败而关闭,则会在调试级别记录该通道。
-
如果通道因
basic.consume
由于排他性消费者条件而被拒绝,则记录在 INFO 级别。 -
所有其他记录都记录在 ERROR 级别。
要修改此行为,您可以将自定义ConditionalExceptionLogger
进入CachingConnectionFactory
在其closeExceptionLogger
财产。
另请参阅消费者事件。
运行时缓存属性
盯着 1.6 版本,CachingConnectionFactory
现在通过getCacheProperties()
方法。
这些统计信息可用于调整缓存,以在生产中对其进行优化。
例如,可以使用高水位线来确定是否应增加缓存大小。
如果它等于缓存大小,您可能需要考虑进一步增加。
下表描述了CacheMode.CHANNEL
性能:
属性 | 意义 |
---|---|
connectionName |
由 |
channelCacheSize |
当前配置的允许空闲的最大通道数。 |
localPort |
连接的本地端口(如果可用)。 这可用于与 RabbitMQ 管理 UI 上的连接和通道相关联。 |
idleChannelsTx |
当前空闲(缓存)的事务通道数。 |
idleChannelsNotTx |
当前空闲(缓存)的非事务性通道数。 |
idleChannelsTxHighWater |
已同时空闲(缓存)的事务通道的最大数量。 |
idleChannelsNotTxHighWater |
非事务通道的最大数量已同时空闲(缓存)。 |
下表描述了CacheMode.CONNECTION
性能:
属性 | 意义 |
---|---|
connectionName:<localPort> |
由 |
openConnections |
表示与代理的连接的连接对象数。 |
channelCacheSize |
当前配置的允许空闲的最大通道数。 |
connectionCacheSize |
当前配置的允许空闲的最大连接数。 |
idleConnections |
当前空闲的连接数。 |
idleConnectionsHighWater |
已同时空闲的最大连接数。 |
idleChannelsTx:<localPort> |
此连接当前空闲(缓存)的事务通道数。
您可以使用 |
idleChannelsNotTx:<localPort> |
此连接当前处于空闲(缓存)状态的非事务性通道数。
这 |
idleChannelsTxHighWater:<localPort> |
已同时空闲(缓存)的事务通道的最大数量。 属性名称的 localPort 部分可用于与 RabbitMQ 管理 UI 上的连接和通道相关联。 |
idleChannelsNotTxHighWater:<localPort> |
非事务通道的最大数量已同时空闲(缓存)。
您可以使用 |
这cacheMode
属性 (CHANNEL
或CONNECTION
) 也包括在内。

RabbitMQ 自动连接/拓扑恢复
自 Spring AMQP 的第一个版本以来,该框架在代理发生故障时提供了自己的连接和通道恢复。
此外,如配置代理中所述,该RabbitAdmin
在重新建立连接时重新声明任何基础架构 Bean(队列和其他)。
因此,它不依赖于现在由amqp-client
图书馆。
这amqp-client
,默认启用自动恢复。
两种恢复机制之间存在一些不兼容,因此默认情况下,Spring 将automaticRecoveryEnabled
基础上的属性RabbitMQ connectionFactory
自false
.
即使该属性是true
,Spring 通过立即关闭任何恢复的连接来有效地禁用它。
默认情况下,只有定义为 bean 的元素(队列、交换、绑定)才会在连接失败后重新声明。 有关如何更改该行为,请参阅恢复自动删除声明。 |
4.1.3. 添加自定义客户端连接属性
这CachingConnectionFactory
现在允许您访问底层连接工厂以允许,例如,
设置自定义客户端属性。
以下示例显示了如何执行此作:
connectionFactory.getRabbitConnectionFactory().getClientProperties().put("thing1", "thing2");
查看连接时,这些属性会显示在 RabbitMQ 管理 UI 中。
4.1.4.AmqpTemplate
与 Spring Framework 和相关项目提供的许多其他高级抽象一样,Spring AMQP 提供了一个起着核心作用的“模板”。
定义主要作的接口称为AmqpTemplate
.
这些作涵盖了发送和接收消息的一般行为。
换句话说,它们对于任何实现都不是唯一的,因此名称中的“AMQP”。
另一方面,该接口的实现与 AMQP 协议的实现相关联。
与 JMS 本身是接口级 API 不同,AMQP 是线级协议。
该协议的实现提供自己的客户端库,因此模板接口的每个实现都依赖于特定的客户端库。
目前,只有一个实现:RabbitTemplate
.
在下面的示例中,我们经常使用AmqpTemplate
.
但是,当您查看实例化模板或调用 setter 的配置示例或任何代码摘录时,您可以看到实现类型(例如,RabbitTemplate
).
另请参阅异步兔子模板。
添加重试功能
从 1.3 版开始,您现在可以配置RabbitTemplate
使用RetryTemplate
以帮助处理代理连接问题。有关完整信息,请参阅 spring-retry 项目。以下只是一个使用指数退避策略和默认SimpleRetryPolicy
,这会在向调用方抛出异常之前进行三次尝试。
以下示例使用 XML 命名空间:
<rabbit:template id="template" connection-factory="connectionFactory" retry-template="retryTemplate"/>
<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
<property name="backOffPolicy">
<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
<property name="initialInterval" value="500" />
<property name="multiplier" value="10.0" />
<property name="maxInterval" value="10000" />
</bean>
</property>
</bean>
以下示例使用@Configuration
Java 中的注释:
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
RetryTemplate retryTemplate = new RetryTemplate();
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(500);
backOffPolicy.setMultiplier(10.0);
backOffPolicy.setMaxInterval(10000);
retryTemplate.setBackOffPolicy(backOffPolicy);
template.setRetryTemplate(retryTemplate);
return template;
}
从 1.4 版本开始,除了retryTemplate
属性,则recoveryCallback
选项支持RabbitTemplate
. 它用作RetryTemplate.execute(RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback)
.
这RecoveryCallback 在某种程度上受到限制,因为重试上下文仅包含lastThrowable 田。 对于更复杂的用例,您应该使用RetryTemplate 以便您可以向RecoveryCallback 通过上下文的属性。以下示例显示了如何执行此作: |
retryTemplate.execute(
new RetryCallback<Object, Exception>() {
@Override
public Object doWithRetry(RetryContext context) throws Exception {
context.setAttribute("message", message);
return rabbitTemplate.convertAndSend(exchange, routingKey, message);
}
}, new RecoveryCallback<Object>() {
@Override
public Object recover(RetryContext context) throws Exception {
Object message = context.getAttribute("message");
Throwable t = context.getLastThrowable();
// Do something with message
return null;
}
});
}
在这种情况下,您不会注入RetryTemplate
进入RabbitTemplate
.
发布是异步的 — 如何检测成功和失败
发布消息是一种异步机制,默认情况下,RabbitMQ 会删除无法路由的消息。为了成功发布,您可以收到异步确认,如相关发布者确认和返回中所述。考虑两种失败场景:
-
发布到交换,但没有匹配的目标队列。
-
发布到不存在的交易所。
第一种情况由发布者退货涵盖,如相关发布者确认和退货中所述。
对于第二种情况,消息被删除并且不会生成任何返回。底层通道关闭并出现异常。默认情况下,会记录此异常,但您可以注册一个ChannelListener
使用CachingConnectionFactory
以获取此类事件的通知。以下示例演示如何添加ConnectionListener
:
this.connectionFactory.addConnectionListener(new ConnectionListener() {
@Override
public void onCreate(Connection connection) {
}
@Override
public void onShutDown(ShutdownSignalException signal) {
...
}
});
您可以检查信号的reason
属性来确定发生的问题。
要检测发送线程上的异常,您可以setChannelTransacted(true)
在RabbitTemplate
并且在txCommit()
. 但是,事务会严重影响性能,因此在仅为这一用例启用事务之前请仔细考虑这一点。
相关发布者确认并返回
这RabbitTemplate
实现AmqpTemplate
支持发布者确认和退货。
对于返回的消息,模板的mandatory
属性必须设置为true
或mandatory-expression
必须评估为true
对于特定消息。
此功能需要CachingConnectionFactory
它有它的publisherReturns
属性设置为true
(请参阅出版商确认和退货)。
返回通过注册RabbitTemplate.ReturnsCallback
通过调用setReturnsCallback(ReturnsCallback callback)
.
回调必须实现以下方法:
void returnedMessage(ReturnedMessage returned);
这ReturnedMessage
具有以下属性:
-
message
- 返回的消息本身 -
replyCode
- 指示退货原因的代码 -
replyText
- 返回的文本原因 - 例如NO_ROUTE
-
exchange
- 发送消息的交换 -
routingKey
- 使用的路由密钥
只有一个ReturnsCallback
由每个RabbitTemplate
.
另请参阅回复超时。
对于发布者确认(也称为发布者确认),模板需要CachingConnectionFactory
它有它的publisherConfirm
属性设置为ConfirmType.CORRELATED
.
通过注册RabbitTemplate.ConfirmCallback
通过调用setConfirmCallback(ConfirmCallback callback)
.
回调必须实现以下方法:
void confirm(CorrelationData correlationData, boolean ack, String cause);
这CorrelationData
是客户端在发送原始消息时提供的对象。
这ack
对于ack
和 false 表示nack
.
为nack
实例中,原因可能包含nack
,如果当nack
生成。
例如,向不存在的交换发送消息时。
在这种情况下,代理关闭通道。
关闭的原因包含在cause
.
这cause
在 1.4 版本中添加。
只有一个ConfirmCallback
由RabbitTemplate
.
兔子模板发送作完成后,通道将关闭。
当连接工厂缓存已满时,这会阻止接收确认或返回(当缓存中有空间时,通道不会物理关闭,并且返回和确认正常进行)。
当缓存已满时,框架会将关闭时间推迟最多五秒钟,以便有时间接收确认和返回。
使用确认时,在收到最后一次确认时关闭通道。
仅使用返回时,通道将保持打开状态整整五秒钟。
我们通常建议将连接工厂的channelCacheSize 设置为足够大的值,以便发布消息的通道返回到缓存而不是关闭。
您可以使用 RabbitMQ 管理插件监控通道使用情况。
如果您看到通道快速打开和关闭,则应考虑增加缓存大小以减少服务器上的开销。 |
在 2.1 版之前,为发布者确认启用的通道在收到确认之前会返回到缓存中。
其他一些进程可以签出通道并执行一些导致通道关闭的作,例如将消息发布到不存在的交换。
这可能会导致确认丢失。
版本 2.1 及更高版本在确认未完成时不再将通道返回到缓存。
这RabbitTemplate 执行逻辑close() 每次作后在通道上。
通常,这意味着一次在一个通道上只有一个确认未完成。 |
从 2.2 版开始,回调是在连接工厂的executor 线程。
这是为了避免在回调中执行 Rabbit作时出现潜在的死锁。
在以前的版本中,回调是直接在amqp-client 连接I/O线程;如果执行某些 RPC作(例如打开新通道),则会死锁,因为 I/O 线程会阻止等待结果,但结果需要由 I/O 线程本身处理。
对于这些版本,有必要将工作(例如发送消息)移交给回调中的另一个线程。
这不再需要,因为框架现在将回调调用交给执行器。 |
只要返回回调在 60 秒或更短的时间内执行,就仍会保留在 ack 之前接收返回消息的保证。 确认计划在返回回调退出后或 60 秒后(以先到者为准)传递。 |
这CorrelationData
对象有一个CompletableFuture
您可以使用它来获取结果,而不是使用ConfirmCallback
在模板上。
以下示例演示如何配置CorrelationData
实例:
CorrelationData cd1 = new CorrelationData();
this.templateWithConfirmsEnabled.convertAndSend("exchange", queue.getName(), "foo", cd1);
assertTrue(cd1.getFuture().get(10, TimeUnit.SECONDS).isAck());
ReturnedMessage = cd1.getReturn();
...
由于它是CompletableFuture<Confirm>
,您可以get()
准备就绪或使用时的结果whenComplete()
用于异步回调。
这Confirm
object 是一个具有 2 个属性的简单 bean:ack
和reason
(对于nack
实例)。
未为代理生成的原因填充nack
实例。
它填充了nack
框架生成的实例(例如,关闭连接,同时ack
实例是杰出的)。
此外,当同时启用确认和返回时,CorrelationData
return
属性将填充返回的消息,如果无法路由到任何队列。
保证在使用ack
.CorrelationData.getReturn()
返回一个ReturnMessage
具有属性:
-
message(返回的消息)
-
回复代码
-
回复文本
-
交换
-
路由键
另请参阅作用域作,了解等待发布者确认的更简单机制。
作用域作
通常,在使用模板时,一个Channel
从缓存中检出(或创建),用于作,并返回到缓存中以供重用。
在多线程环境中,不能保证下一个作使用相同的通道。
但是,有时您可能希望更好地控制通道的使用,并确保多个作都在同一通道上执行。
从 2.0 版开始,一个名为invoke
提供,带有OperationsCallback
.
在回调范围内执行的任何作,并在提供的RabbitOperations
参数使用相同的专用Channel
,这将在最后关闭(不返回到缓存中)。
如果通道是PublisherCallbackChannel
,则在收到所有确认后将其返回到缓存中(请参阅相关发布者确认和返回)。
@FunctionalInterface
public interface OperationsCallback<T> {
T doInRabbit(RabbitOperations operations);
}
为什么您可能需要它的一个示例是,如果您希望使用waitForConfirms()
基础上的方法Channel
.
Spring API 以前没有公开此方法,因为如前所述,通道通常是缓存和共享的。
这RabbitTemplate
现在提供waitForConfirms(long timeout)
和waitForConfirmsOrDie(long timeout)
,它委托给在OperationsCallback
.
出于显而易见的原因,这些方法不能在该范围之外使用。
请注意,其他地方提供了允许您将确认与请求相关联的更高级别的抽象(请参阅相关的发布者确认和返回)。 如果您只想等到代理确认交付,则可以使用以下示例中所示的技术:
Collection<?> messages = getMessagesToSend();
Boolean result = this.template.invoke(t -> {
messages.forEach(m -> t.convertAndSend(ROUTE, m));
t.waitForConfirmsOrDie(10_000);
return true;
});
如果您愿意RabbitAdmin
作,在OperationsCallback
,则必须使用相同的RabbitTemplate
用于invoke
操作。
如果模板作已在现有事务的范围内执行,则上述讨论是无意义的,例如,在事务侦听器容器线程上运行并在事务处理的模板上执行作时。
在这种情况下,作将在该通道上执行,并在线程返回到容器时提交。
没有必要使用invoke 在这种情况下。 |
以这种方式使用确认时,实际上并不需要为将确认与请求相关联而设置的许多基础设施(除非还启用了返回)。
从 2.2 版开始,连接工厂支持一个名为publisherConfirmType
.
当将其设置为ConfirmType.SIMPLE
,避免了基础设施,并且可以更有效地进行确认处理。
此外,RabbitTemplate
将publisherSequenceNumber
已发送消息中的属性MessageProperties
.
如果您希望检查(或记录或以其他方式使用)特定的确认,您可以使用重载的invoke
方法,如以下示例所示:
public <T> T invoke(OperationsCallback<T> action, com.rabbitmq.client.ConfirmCallback acks,
com.rabbitmq.client.ConfirmCallback nacks);
这些ConfirmCallback 对象(对于ack 和nack 实例)是 Rabbit 客户端回调,而不是模板回调。 |
以下示例日志ack
和nack
实例:
Collection<?> messages = getMessagesToSend();
Boolean result = this.template.invoke(t -> {
messages.forEach(m -> t.convertAndSend(ROUTE, m));
t.waitForConfirmsOrDie(10_000);
return true;
}, (tag, multiple) -> {
log.info("Ack: " + tag + ":" + multiple);
}, (tag, multiple) -> {
log.info("Nack: " + tag + ":" + multiple);
}));
作用域作绑定到线程。 有关多线程环境中严格排序的讨论,请参阅多线程环境中的严格消息排序。 |
多线程环境中的严格消息排序
作用域作中的讨论仅适用于在同一线程上执行作时。
考虑以下情况:
-
thread-1
将消息发送到队列并将工作移交给thread-2
-
thread-2
将消息发送到同一队列
由于 RabbitMQ 的异步性质和缓存通道的使用;不确定是否会使用相同的通道,因此无法保证消息到达队列的顺序。
(在大多数情况下,它们会按顺序到达,但无序交付的概率不是零)。
要解决此用例,您可以使用大小1
(连同channelCheckoutTimeout
),以确保消息始终在同一渠道发布,并且有秩序有保证。
为此,如果连接工厂有其他用途(例如使用者),则应为模板使用专用连接工厂,或将模板配置为使用嵌入在主连接工厂中的发布者连接工厂(请参阅使用单独的连接)。
用一个简单的 Spring Boot 应用程序来说明这一点:
@SpringBootApplication
public class Application {
private static final Logger log = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
TaskExecutor exec() {
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setCorePoolSize(10);
return exec;
}
@Bean
CachingConnectionFactory ccf() {
CachingConnectionFactory ccf = new CachingConnectionFactory("localhost");
CachingConnectionFactory publisherCF = (CachingConnectionFactory) ccf.getPublisherConnectionFactory();
publisherCF.setChannelCacheSize(1);
publisherCF.setChannelCheckoutTimeout(1000L);
return ccf;
}
@RabbitListener(queues = "queue")
void listen(String in) {
log.info(in);
}
@Bean
Queue queue() {
return new Queue("queue");
}
@Bean
public ApplicationRunner runner(Service service, TaskExecutor exec) {
return args -> {
exec.execute(() -> service.mainService("test"));
};
}
}
@Component
class Service {
private static final Logger LOG = LoggerFactory.getLogger(Service.class);
private final RabbitTemplate template;
private final TaskExecutor exec;
Service(RabbitTemplate template, TaskExecutor exec) {
template.setUsePublisherConnection(true);
this.template = template;
this.exec = exec;
}
void mainService(String toSend) {
LOG.info("Publishing from main service");
this.template.convertAndSend("queue", toSend);
this.exec.execute(() -> secondaryService(toSend.toUpperCase()));
}
void secondaryService(String toSend) {
LOG.info("Publishing from secondary service");
this.template.convertAndSend("queue", toSend);
}
}
即使发布是在两个不同的线程上执行的,它们也会使用相同的通道,因为缓存的上限为单个通道。
从 2.3.7 版本开始,ThreadChannelConnectionFactory
支持将一个线程的通道转移到另一个线程,使用prepareContextSwitch
和switchContext
方法。
第一个方法返回一个上下文,该上下文传递给调用第二个方法的第二个线程。
线程可以绑定到它的非事务通道或事务通道(或每个通道之一);除非使用两个连接工厂,否则不能单独传输它们。
示例如下:
@SpringBootApplication
public class Application {
private static final Logger log = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
TaskExecutor exec() {
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setCorePoolSize(10);
return exec;
}
@Bean
ThreadChannelConnectionFactory tccf() {
ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
rabbitConnectionFactory.setHost("localhost");
return new ThreadChannelConnectionFactory(rabbitConnectionFactory);
}
@RabbitListener(queues = "queue")
void listen(String in) {
log.info(in);
}
@Bean
Queue queue() {
return new Queue("queue");
}
@Bean
public ApplicationRunner runner(Service service, TaskExecutor exec) {
return args -> {
exec.execute(() -> service.mainService("test"));
};
}
}
@Component
class Service {
private static final Logger LOG = LoggerFactory.getLogger(Service.class);
private final RabbitTemplate template;
private final TaskExecutor exec;
private final ThreadChannelConnectionFactory connFactory;
Service(RabbitTemplate template, TaskExecutor exec,
ThreadChannelConnectionFactory tccf) {
this.template = template;
this.exec = exec;
this.connFactory = tccf;
}
void mainService(String toSend) {
LOG.info("Publishing from main service");
this.template.convertAndSend("queue", toSend);
Object context = this.connFactory.prepareSwitchContext();
this.exec.execute(() -> secondaryService(toSend.toUpperCase(), context));
}
void secondaryService(String toSend, Object threadContext) {
LOG.info("Publishing from secondary service");
this.connFactory.switchContext(threadContext);
this.template.convertAndSend("queue", toSend);
this.connFactory.closeThreadChannel();
}
}
一旦prepareSwitchContext 被调用时,如果当前线程执行更多作,它们将在新通道上执行。
当不再需要线程绑定通道时,关闭它很重要。 |
消息传递集成
从 1.4 版本开始,RabbitMessagingTemplate
(建立在RabbitTemplate
)提供了与 Spring Framework 消息传递抽象的集成——即,org.springframework.messaging.Message
.
这样,您就可以使用spring-messaging
Message<?>
抽象化。
其他 Spring 项目也使用此抽象,例如 Spring Integration 和 Spring 的 STOMP 支持。
涉及两个消息转换器:一个用于在 spring-messaging 之间进行转换Message<?>
和 Spring AMQP 的Message
抽象和一个在 Spring AMQP 之间转换的Message
抽象和底层 RabbitMQ 客户端库所需的格式。
默认情况下,消息有效负载由提供的RabbitTemplate
实例的消息转换器。
或者,您可以注入自定义MessagingMessageConverter
使用其他一些有效负载转换器,如以下示例所示:
MessagingMessageConverter amqpMessageConverter = new MessagingMessageConverter();
amqpMessageConverter.setPayloadConverter(myPayloadConverter);
rabbitMessagingTemplate.setAmqpMessageConverter(amqpMessageConverter);
已验证的用户 ID
从 1.6 版开始,模板现在支持user-id-expression
(userIdExpression
使用 Java 配置时)。
如果发送了消息,那么在计算此表达式后设置用户标识属性(如果尚未设置)。
评估的根对象是要发送的消息。
以下示例演示如何使用user-id-expression
属性:
<rabbit:template ... user-id-expression="'guest'" />
<rabbit:template ... user-id-expression="@myConnectionFactory.username" />
第一个示例是文字表达式。第二个示例获取username
应用程序上下文中连接工厂 bean 的属性。
使用单独的连接
从 2.0.2 版开始,您可以将usePublisherConnection
属性设置为true
尽可能使用与侦听器容器使用的连接不同的连接。这是为了避免在生产者因任何原因被阻止时被阻止消费者。为此,连接工厂维护第二个内部连接工厂;默认情况下,它与主工厂类型相同,但如果您希望使用不同的工厂类型进行发布,则可以显式设置。如果兔子模板在侦听器容器启动的事务中运行,则无论此设置如何,都会使用容器的通道。
通常,您不应该使用RabbitAdmin 将此设置为true .
使用RabbitAdmin 采用连接工厂的构造函数。
如果使用采用模板的另一个构造函数,请确保模板的属性为false .
这是因为,管理员通常用于声明侦听器容器的队列。
使用属性设置为true 意味着独占队列(例如AnonymousQueue ) 将在与侦听器容器使用的连接不同的连接上声明。
在这种情况下,容器无法使用队列。 |
4.1.5. 发送消息
发送消息时,可以使用以下任一方法:
void send(Message message) throws AmqpException;
void send(String routingKey, Message message) throws AmqpException;
void send(String exchange, String routingKey, Message message) throws AmqpException;
我们可以从前面列表中的最后一个方法开始讨论,因为它实际上是最明确的。
它允许在运行时提供 AMQP 交换名称(以及路由密钥)。
最后一个参数是负责实际创建消息实例的回调。
使用此方法发送消息的示例可能如下所示:
以下示例演示如何使用send
发送消息的方法:
amqpTemplate.send("marketData.topic", "quotes.nasdaq.THING1",
new Message("12.34".getBytes(), someProperties));
您可以将exchange
属性,如果您计划在大部分时间或所有时间使用该模板实例发送到同一交易所。
在这种情况下,可以使用前面列表中的第二种方法。
以下示例在功能上等同于前面的示例:
amqpTemplate.setExchange("marketData.topic");
amqpTemplate.send("quotes.nasdaq.FOO", new Message("12.34".getBytes(), someProperties));
如果同时exchange
和routingKey
属性,则可以使用仅接受Message
.
以下示例显示了如何执行此作:
amqpTemplate.setExchange("marketData.topic");
amqpTemplate.setRoutingKey("quotes.nasdaq.FOO");
amqpTemplate.send(new Message("12.34".getBytes(), someProperties));
考虑交换和路由键属性的更好方法是显式方法参数始终覆盖模板的默认值。
事实上,即使您没有在模板上显式设置这些属性,也始终存在默认值。
在这两种情况下,默认值都是空的String
,但这实际上是一个明智的默认值。
就路由键而言,它并不总是一开始就必需的(例如,对于
一个Fanout
交换)。
此外,队列可以绑定到具有空String
.
这些都是依赖默认空的合法场景String
模板的路由键属性的值。
就交易所名称而言,空的String
常用,因为 AMQP 规范将“默认交换”定义为没有名称。
由于所有队列都自动绑定到该默认交换(即直接交换),因此使用其名称作为绑定值,因此前面列表中的第二种方法可用于通过默认交换向任何队列进行简单的点对点消息传递。
您可以将队列名称作为routingKey
,通过在运行时提供方法参数。
以下示例显示了如何执行此作:
RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.send("queue.helloWorld", new Message("Hello World".getBytes(), someProperties));
或者,您可以创建一个模板,该模板可用于主要或专门发布到单个队列。 以下示例显示了如何执行此作:
RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.setRoutingKey("queue.helloWorld"); // but we'll always send to this Queue
template.send(new Message("Hello World".getBytes(), someProperties));
消息生成器 API
从 1.3 版开始,消息构建器 API 由MessageBuilder
和MessagePropertiesBuilder
.
这些方法提供了一种方便的“流畅”方法来创建消息或消息属性。
以下示例显示了 Fluent API 的实际应用:
Message message = MessageBuilder.withBody("foo".getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("123")
.setHeader("bar", "baz")
.build();
MessageProperties props = MessagePropertiesBuilder.newInstance()
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("123")
.setHeader("bar", "baz")
.build();
Message message = MessageBuilder.withBody("foo".getBytes())
.andProperties(props)
.build();
在MessageProperties
可以设置。
其他方法包括setHeader(String key, String value)
,removeHeader(String key)
,removeHeaders()
和copyProperties(MessageProperties properties)
.
每个属性设置方法都有一个set*IfAbsent()
变体。
在存在默认初始值的情况下,该方法名为set*IfAbsentOrDefault()
.
提供了五个静态方法来创建初始消息生成器:
public static MessageBuilder withBody(byte[] body) (1)
public static MessageBuilder withClonedBody(byte[] body) (2)
public static MessageBuilder withBody(byte[] body, int from, int to) (3)
public static MessageBuilder fromMessage(Message message) (4)
public static MessageBuilder fromClonedMessage(Message message) (5)
1 | 构建器创建的消息具有对参数的直接引用的正文。 |
2 | 构建器创建的消息有一个正文,该正文是一个新数组,其中包含参数中的字节副本。 |
3 | 构建器创建的消息有一个正文,该正文是一个新数组,其中包含参数中的字节范围。
看Arrays.copyOfRange() 了解更多详情。 |
4 | 构建器创建的消息具有对参数正文的直接引用的正文。
参数的属性被复制到新的MessageProperties 对象。 |
5 | 构建器创建的消息有一个正文,该正文是一个包含参数正文副本的新数组。
参数的属性被复制到新的MessageProperties 对象。 |
提供了三种静态方法来创建MessagePropertiesBuilder
实例:
public static MessagePropertiesBuilder newInstance() (1)
public static MessagePropertiesBuilder fromProperties(MessageProperties properties) (2)
public static MessagePropertiesBuilder fromClonedProperties(MessageProperties properties) (3)
1 | 使用默认值初始化新的消息属性对象。 |
2 | 构建器初始化为 和build() 将返回提供的属性对象。 |
3 | 参数的属性被复制到新的MessageProperties 对象。 |
使用RabbitTemplate
实现AmqpTemplate
,每个send()
methods 有一个重载版本,它需要额外的CorrelationData
对象。
启用发布者确认后,此对象将在AmqpTemplate
.
这允许发件人关联确认 (ack
或nack
) 替换为已发送的消息。
从 1.6.7 版本开始,CorrelationAwareMessagePostProcessor
引入接口,允许在消息转换后修改相关数据。
以下示例演示如何使用它:
Message postProcessMessage(Message message, Correlation correlation);
在 2.0 版中,此接口已被弃用。
该方法已移至MessagePostProcessor
使用委托给postProcessMessage(Message message)
.
同样从 1.6.7 版本开始,一个名为CorrelationDataPostProcessor
被提供。
毕竟这是被调用的MessagePostProcessor
实例(在send()
方法以及setBeforePublishPostProcessors()
).
实现可以更新或替换send()
方法(如果有)。
这Message
和原创CorrelationData
(如果有)作为参数提供。
以下示例演示如何使用postProcess
方法:
CorrelationData postProcess(Message message, CorrelationData correlationData);
发布者退货
当模板的mandatory
属性是true
,返回的消息由AmqpTemplate
.
从 1.4 版开始,RabbitTemplate
支持 SpELmandatoryExpression
属性,该属性作为根评估对象针对每个请求消息进行评估,解析为boolean
价值。
Bean 引用,例如@myBean.isMandatory(#root)
,可用于表达式。
发布者返回也可以由RabbitTemplate
在发送和接收作中。
有关更多信息,请参阅回复超时。
配料
1.4.2 版引入了BatchingRabbitTemplate
.
这是RabbitTemplate
替换的send
根据BatchingStrategy
.
只有当批处理完成时,消息才会发送到 RabbitMQ。
以下列表显示了BatchingStrategy
接口定义:
public interface BatchingStrategy {
MessageBatch addToBatch(String exchange, String routingKey, Message message);
Date nextRelease();
Collection<MessageBatch> releaseBatches();
}
批处理数据保存在内存中。 如果发生系统故障,未发送的消息可能会丢失。 |
一个SimpleBatchingStrategy
被提供。
它支持将消息发送到单个交换或路由密钥。
它具有以下属性:
-
batchSize
:发送之前批处理中的消息数。 -
bufferLimit
:批处理消息的最大大小。 这会抢占batchSize
,如果超过,则会导致发送部分批次。 -
timeout
:在没有向批处理添加消息的新活动时发送部分批处理的时间。
这SimpleBatchingStrategy
通过在每个嵌入的消息前面加上 4 字节的二进制长度来格式化批处理。
通过设置springBatchFormat
message 属性设置为lengthHeader4
.
默认情况下,侦听器容器会自动取消批处理的消息(通过使用springBatchFormat message header) 的 Message 标头)。
拒绝批处理中的任何邮件会导致整个批处理被拒绝。 |
但是,有关详细信息,请参阅@RabbitListener使用批处理。
4.1.6. 接收消息
消息接收总是比发送复杂一些。
有两种方式可以接收Message
.
更简单的选择是轮询一个Message
一次使用轮询方法调用。
更复杂但更常见的方法是注册接收Messages
按需异步。
我们将在接下来的两个小节中考虑每种方法的示例。
轮询消费者
这AmqpTemplate
本身可用于轮询Message
接待。
默认情况下,如果没有可用的消息,null
立即返回。
没有阻塞。
从 1.5 版开始,您可以将receiveTimeout
,以毫秒为单位,接收方法阻塞最多该时间,等待消息。
小于零的值表示无限期阻止(或至少在与代理的连接丢失之前)。
1.6 版引入了receive
允许在每次调用时传入超时的方法。
由于接收作会创建一个新的QueueingConsumer 对于每条消息,此技术并不真正适合大容量环境。
考虑使用异步使用者或receiveTimeout 为零。 |
从版本 2.4.8 开始,当使用非零超时时,您可以指定传递给basicConsume
方法,用于将消费者与通道相关联。
例如:template.addConsumerArg("x-priority", 10)
.
有四个简单的receive
可用的方法。与Exchange
在发送端,有一个方法要求已设置默认队列属性直接在模板本身上,并且有一个方法在运行时接受队列参数。1.6 版引入了接受的变体timeoutMillis
覆盖receiveTimeout
基于每个请求。以下列表显示了四种方法的定义:
Message receive() throws AmqpException;
Message receive(String queueName) throws AmqpException;
Message receive(long timeoutMillis) throws AmqpException;
Message receive(String queueName, long timeoutMillis) throws AmqpException;
与发送消息一样,该AmqpTemplate
有一些接收 POJO 的便捷方法,而不是Message
实例和实现提供了一种自定义MessageConverter
用于创建Object
返回:
以下列表显示了这些方法:
Object receiveAndConvert() throws AmqpException;
Object receiveAndConvert(String queueName) throws AmqpException;
Object receiveAndConvert(long timeoutMillis) throws AmqpException;
Object receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException;
从 2.0 版开始,这些方法的变体需要额外的ParameterizedTypeReference
参数来转换复杂类型。
模板必须配置SmartMessageConverter
.
看从Message
跟RabbitTemplate
了解更多信息。
似sendAndReceive
方法,从 1.3 版开始,AmqpTemplate
有几个便利receiveAndReply
用于同步接收、处理和回复消息的方法。
以下列表显示了这些方法定义:
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback)
throws AmqpException;
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback)
throws AmqpException;
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,
String replyExchange, String replyRoutingKey) throws AmqpException;
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
String replyExchange, String replyRoutingKey) throws AmqpException;
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,
ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;
这AmqpTemplate
实现负责receive
和reply
阶段。
在大多数情况下,您应该只提供ReceiveAndReplyCallback
为接收到的消息执行一些业务逻辑,并根据需要构建回复对象或消息。
注意,一个ReceiveAndReplyCallback
可能会返回null
.
在这种情况下,不会发送回复,并且receiveAndReply
工作方式类似于receive
方法。
这样,同一队列就可以用于混合消息,其中一些消息可能不需要回复。
仅当提供的回调不是ReceiveAndReplyMessageCallback
,它提供原始消息交换协定。
这ReplyToAddressCallback
对于需要自定义逻辑来确定replyTo
运行时针对收到的消息的地址,并从ReceiveAndReplyCallback
.
默认情况下,replyTo
请求消息中的信息用于路由回复。
以下列表显示了基于 POJO 的接收和回复示例:
boolean received =
this.template.receiveAndReply(ROUTE, new ReceiveAndReplyCallback<Order, Invoice>() {
public Invoice handle(Order order) {
return processOrder(order);
}
});
if (received) {
log.info("We received an order!");
}
异步使用者
Spring AMQP 还通过使用@RabbitListener 注释,并提供一个开放的基础设施来以编程方式注册端点。
这是迄今为止设置异步消费者的最便捷方法。
有关更多详细信息,请参阅注释驱动的侦听器端点。 |
预取默认值过去为 1,这可能导致高效使用者的利用率不足。 从 2.0 版开始,默认预取值现在为 250,这应该会让消费者在大多数常见场景中保持忙碌,并且 从而提高吞吐量。 然而,在某些情况下,预取值应该很低:
此外,对于低容量消息传递和多个使用者(包括单个侦听器容器实例中的并发性),您可能希望减少预取,以便在使用者之间获得更均匀的消息分布。 请参阅消息侦听器容器配置。 有关预取的更多背景信息,请参阅这篇关于 RabbitMQ 中消费者利用率的文章和这篇关于排队理论的文章。 |
消息侦听器
对于异步Message
reception,一个专用组件(不是AmqpTemplate
)涉及。
该组件是Message
-consuming 回调。
我们将在本节后面讨论容器及其属性。
不过,首先,我们应该看看回调,因为这是您的应用程序代码与消息传递系统集成的地方。
回调有几个选项,首先是MessageListener
界面,如下表所示:
public interface MessageListener {
void onMessage(Message message);
}
如果您的回调逻辑出于任何原因依赖于 AMQP 通道实例,您可以改用ChannelAwareMessageListener
.
它看起来很相似,但有一个额外的参数。
以下列表显示了ChannelAwareMessageListener
接口定义:
public interface ChannelAwareMessageListener {
void onMessage(Message message, Channel channel) throws Exception;
}
在 2.1 版本中,此接口从 packageo.s.amqp.rabbit.core 自o.s.amqp.rabbit.listener.api . |
MessageListenerAdapter
如果您希望在应用程序逻辑和消息传递 API 之间保持更严格的分离,那么可以依赖框架提供的适配器实现。 这通常被称为“消息驱动的 POJO”支持。
1.5 版为 POJO 消息传递引入了一种更灵活的机制,即@RabbitListener 注解。
有关更多信息,请参阅注释驱动的侦听器端点。 |
使用适配器时,只需提供对适配器本身应调用的实例的引用。 以下示例显示了如何执行此作:
MessageListenerAdapter listener = new MessageListenerAdapter(somePojo);
listener.setDefaultListenerMethod("myMethod");
您可以对适配器进行子类化,并提供getListenerMethodName()
根据消息动态选择不同的方法。
此方法有两个参数,originalMessage
和extractedMessage
,后者是任何转换的结果。
默认情况下,一个SimpleMessageConverter
已配置。
看SimpleMessageConverter
有关其他可用转换器的更多信息和信息。
从 1.4.2 版开始,原始消息具有consumerQueue
和consumerTag
属性,可用于确定从中接收消息的队列。
从 1.5 版本开始,您可以配置消费者队列或标签到方法名称的映射,以动态选择要调用的方法。
如果映射中没有条目,我们回退到默认的监听器方法。
默认侦听器方法(如果未设置)为handleMessage
.
从 2.0 版本开始,方便的FunctionalInterface
已提供。
以下列表显示了FunctionalInterface
:
@FunctionalInterface
public interface ReplyingMessageListener<T, R> {
R handleMessage(T t);
}
此接口有助于使用 Java 8 lambda 方便地配置适配器,如以下示例所示:
new MessageListenerAdapter((ReplyingMessageListener<String, String>) data -> {
...
return result;
}));
从 2.2 版开始,buildListenerArguments(Object)
已被弃用和新增buildListenerArguments(Object, Channel, Message)
取而代之的是引入了一个。
新方法帮助听众获得Channel
和Message
参数来执行更多作,例如调用channel.basicReject(long, boolean)
在手动确认模式下。
以下列表显示了最基本的示例:
public class ExtendedListenerAdapter extends MessageListenerAdapter {
@Override
protected Object[] buildListenerArguments(Object extractedMessage, Channel channel, Message message) {
return new Object[]{extractedMessage, channel, message};
}
}
现在您可以配置ExtendedListenerAdapter
与MessageListenerAdapter
如果您需要接收“频道”和“消息”。
监听器的参数应设置为buildListenerArguments(Object, Channel, Message)
返回,如以下侦听器示例所示:
public void handleMessage(Object object, Channel channel, Message message) throws IOException {
...
}
容器
现在您已经了解了Message
-listen 回调,我们可以将注意力转向容器。
基本上,容器处理“主动”职责,以便侦听器回调可以保持被动。
容器是“生命周期”组件的一个示例。
它提供了启动和停止的方法。
配置容器时,您基本上弥合了 AMQP 队列和MessageListener
实例。
您必须提供对ConnectionFactory
以及该侦听器应从中使用消息的队列名称或队列实例。
在 2.0 版本之前,有一个侦听器容器,即SimpleMessageListenerContainer
.
现在有第二个容器,即DirectMessageListenerContainer
.
选择要使用的容器和条件之间的差异在选择容器中进行了描述。
以下列表显示了最基本的示例,它的工作原理是使用SimpleMessageListenerContainer
:
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory);
container.setQueueNames("some.queue");
container.setMessageListener(new MessageListenerAdapter(somePojo));
作为“活动”组件,最常见的是创建带有 bean 定义的侦听器容器,以便它可以在后台运行。 以下示例显示了使用 XML 执行此作的一种方法:
<rabbit:listener-container connection-factory="rabbitConnectionFactory">
<rabbit:listener queues="some.queue" ref="somePojo" method="handle"/>
</rabbit:listener-container>
以下列表显示了使用 XML 执行此作的另一种方法:
<rabbit:listener-container connection-factory="rabbitConnectionFactory" type="direct">
<rabbit:listener queues="some.queue" ref="somePojo" method="handle"/>
</rabbit:listener-container>
前面的两个示例都创建了一个DirectMessageListenerContainer
(请注意type
属性 — 它默认为simple
).
或者,您可能更喜欢使用 Java 配置,它看起来类似于前面的代码片段:
@Configuration
public class ExampleAmqpConfiguration {
@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setQueueName("some.queue");
container.setMessageListener(exampleListener());
return container;
}
@Bean
public CachingConnectionFactory rabbitConnectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
@Bean
public MessageListener exampleListener() {
return new MessageListener() {
public void onMessage(Message message) {
System.out.println("received: " + message);
}
};
}
}
消费者优先
从 RabbitMQ 版本 3.2 开始,代理现在支持消费者优先级(请参阅将消费者优先级与 RabbitMQ 结合使用)。
这是通过设置x-priority
关于消费者的争论。
这SimpleMessageListenerContainer
现在支持设置使用者参数,如以下示例所示:
container.setConsumerArguments(Collections.
<String, Object> singletonMap("x-priority", Integer.valueOf(10)));
为方便起见,命名空间提供了priority
属性listener
元素,如以下示例所示:
<rabbit:listener-container connection-factory="rabbitConnectionFactory">
<rabbit:listener queues="some.queue" ref="somePojo" method="handle" priority="10" />
</rabbit:listener-container>
从 1.3 版开始,您可以修改容器在运行时侦听的队列。 请参阅侦听器容器队列。
auto-delete
队列
当容器配置为侦听auto-delete
queues,队列有一个x-expires
选项,或者在 Broker 上配置了 Time-To-Live 策略,则当容器停止时(即,当最后一个使用者被取消时),Broker 将删除队列。
在 1.3 版本之前,由于缺少队列,无法重新启动容器。
这RabbitAdmin
仅在连接关闭或打开时自动重新声明队列等,当容器停止和启动时不会发生这种情况。
从 1.3 版开始,容器使用RabbitAdmin
在启动期间重新声明任何丢失的队列。
您还可以将条件声明(请参阅条件声明)与auto-startup="false"
admin 将队列声明推迟到容器启动。
以下示例显示了如何执行此作:
<rabbit:queue id="otherAnon" declared-by="containerAdmin" />
<rabbit:direct-exchange name="otherExchange" auto-delete="true" declared-by="containerAdmin">
<rabbit:bindings>
<rabbit:binding queue="otherAnon" key="otherAnon" />
</rabbit:bindings>
</rabbit:direct-exchange>
<rabbit:listener-container id="container2" auto-startup="false">
<rabbit:listener id="listener2" ref="foo" queues="otherAnon" admin="containerAdmin" />
</rabbit:listener-container>
<rabbit:admin id="containerAdmin" connection-factory="rabbitConnectionFactory"
auto-startup="false" />
在这种情况下,队列和交换由containerAdmin
,其中有auto-startup="false"
以便在上下文初始化期间不会声明元素。
此外,容器未启动的原因相同。
稍后启动容器时,它会将其引用用于containerAdmin
以声明元素。
批处理消息
批处理消息(由生产者创建)由侦听器容器自动取消批处理(使用springBatchFormat
message header) 的 Message 标头)。
拒绝批处理中的任何邮件会导致整个批处理被拒绝。
有关批处理的更多信息,请参阅批处理。
从 2.2 版开始,SimpleMessageListenerContainer
可用于在消费者端(生产者发送离散消息的地方)创建批处理。
设置容器属性consumerBatchEnabled
以启用此功能。deBatchingEnabled
还必须为 true,以便容器负责处理这两种类型的批次。
实现BatchMessageListener
或ChannelAwareBatchMessageListener
什么时候consumerBatchEnabled
是真的。
从 2.2.7 版开始,两个SimpleMessageListenerContainer
和DirectMessageListenerContainer
可以将生产者创建的批次取消批处理为List<Message>
.
请参阅 @RabbitListener 与批处理 有关将此功能与@RabbitListener
.
消费者活动
每当侦听器
(消费者)经历了某种失败。
活动ListenerContainerConsumerFailedEvent
具有以下属性:
-
container
:使用者遇到问题的侦听器容器。 -
reason
:失败的文本原因。 -
fatal
:指示故障是否致命的布尔值。 在非致命异常的情况下,容器会尝试重新启动使用者,根据recoveryInterval
或recoveryBackoff
(对于SimpleMessageListenerContainer
) 或monitorInterval
(对于DirectMessageListenerContainer
). -
throwable
:这Throwable
那被抓住了。
这些事件可以通过实现ApplicationListener<ListenerContainerConsumerFailedEvent>
.
系统范围的事件(例如连接失败)由所有使用者发布,当concurrentConsumers 大于 1。 |
如果使用者失败,因为默认情况下,如果其队列被独占使用,以及发布事件,则WARN
日志。
要更改此日志记录行为,请提供自定义ConditionalExceptionLogger
在SimpleMessageListenerContainer
实例的exclusiveConsumerExceptionLogger
财产。
另请参阅记录通道关闭事件。
致命错误始终记录在ERROR
水平。
这是不可修改的。
其他几个事件在容器生命周期的各个阶段发布:
-
AsyncConsumerStartedEvent
:当使用者启动时。 -
AsyncConsumerRestartedEvent
:当使用者在发生故障后重新启动时 -SimpleMessageListenerContainer
只。 -
AsyncConsumerTerminatedEvent
:当消费者正常停止时。 -
AsyncConsumerStoppedEvent
:当消费者停止时 -SimpleMessageListenerContainer
只。 -
ConsumeOkEvent
:当consumeOk
从代理接收,包含队列名称和consumerTag
-
ListenerContainerIdleEvent
:请参阅检测空闲异步使用者。 -
MissingQueueEvent
:检测到缺少队列时。
消费者标签
您可以提供生成消费者标签的策略。
默认情况下,消费者标签由代理生成。
以下列表显示了ConsumerTagStrategy
接口定义:
public interface ConsumerTagStrategy {
String createConsumerTag(String queue);
}
队列可用,以便可以(可选)在标记中使用。
请参阅消息侦听器容器配置。
注释驱动的侦听器端点
异步接收消息的最简单方法是使用带注释的侦听器端点基础结构。
简而言之,它允许您将托管 Bean 的方法公开为 Rabbit 侦听器端点。
以下示例演示如何使用@RabbitListener
注解:
@Component
public class MyService {
@RabbitListener(queues = "myQueue")
public void processOrder(String data) {
...
}
}
前面示例的思想是,每当名为myQueue
这processOrder
方法被相应地调用(在本例中,使用消息的有效负载)。
带注释的端点基础结构在后台为每个带注释的方法创建一个消息侦听器容器,方法是使用RabbitListenerContainerFactory
.
在前面的示例中,myQueue
必须已经存在并绑定到某种交换。
队列可以自动声明和绑定,只要RabbitAdmin
存在于应用程序上下文中。
属性占位符 (${some.property} ) 或 SpEL 表达式 (#{someExpression} ) 可以为注释属性 (queues 等等)。
请参阅监听多个队列,了解为什么可以使用 SpEL 而不是属性占位符。
以下列表显示了如何声明 Rabbit 侦听器的三个示例: |
@Component
public class MyService {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "myQueue", durable = "true"),
exchange = @Exchange(value = "auto.exch", ignoreDeclarationExceptions = "true"),
key = "orderRoutingKey")
)
public void processOrder(Order order) {
...
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue,
exchange = @Exchange(value = "auto.exch"),
key = "invoiceRoutingKey")
)
public void processInvoice(Invoice invoice) {
...
}
@RabbitListener(queuesToDeclare = @Queue(name = "${my.queue}", durable = "true"))
public String handleWithSimpleDeclare(String data) {
...
}
}
在第一个示例中,队列myQueue
如果需要,与交换一起自动声明(持久),
并绑定到具有路由密钥的交换。
在第二个示例中,声明并绑定了一个匿名(独占、自动删除)队列;队列名称由框架使用Base64UrlNamingStrategy
.
您不能使用此技术声明代理命名的队列;它们需要声明为 bean 定义;请参阅容器和代理命名队列。
倍数QueueBinding
可以提供条目,让监听器监听多个队列。
在第三个示例中,名称为 “检索自属性my.queue
如有必要,使用队列名称作为路由键,使用默认绑定到默认交换进行声明。
从 2.0 版本开始,@Exchange
注释支持任何交换类型,包括自定义。
有关详细信息,请参阅 AMQP 概念。
您可以使用普通@Bean
定义,当您需要更高级的配置时。
通知ignoreDeclarationExceptions
在第一个示例中的交易所。
例如,这允许绑定到可能具有不同设置(例如internal
).
默认情况下,现有交换的属性必须匹配。
从版本 2.0 开始,您现在可以使用多个路由密钥将队列绑定到交换,如以下示例所示:
...
key = { "red", "yellow" }
...
您还可以在@QueueBinding
队列、交换、
和绑定,如以下示例所示:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "auto.headers", autoDelete = "true",
arguments = @Argument(name = "x-message-ttl", value = "10000",
type = "java.lang.Integer")),
exchange = @Exchange(value = "auto.headers", type = ExchangeTypes.HEADERS, autoDelete = "true"),
arguments = {
@Argument(name = "x-match", value = "all"),
@Argument(name = "thing1", value = "somevalue"),
@Argument(name = "thing2")
})
)
public String handleWithHeadersExchange(String foo) {
...
}
请注意,x-message-ttl
参数设置为队列的 10 秒。
由于参数类型不是String
,我们必须指定其类型——在本例中,Integer
.
与所有此类声明一样,如果队列已经存在,则参数必须与队列上的参数匹配。
对于标头交换,我们设置绑定参数以匹配具有thing1
header 设置为somevalue
和
这thing2
header 必须与任何值一起存在。
这x-match
参数表示必须满足这两个条件。
参数名称、值和类型可以是属性占位符 (${…}
) 或 SpEL 表达式 (#{…}
).
这name
必须解析为String
.
的表达式type
必须解析为Class
或类的完全限定名称。
这value
必须解析为可以通过DefaultConversionService
设置为类型(例如x-message-ttl
在前面的示例中)。
如果名称解析为null
或空的String
那@Argument
被忽略。
元注释
有时您可能希望对多个侦听器使用相同的配置。 要减少样板配置,您可以使用元注释来创建自己的侦听器注释。 以下示例显示了如何执行此作:
@Target({ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@RabbitListener(bindings = @QueueBinding(
value = @Queue,
exchange = @Exchange(value = "metaFanout", type = ExchangeTypes.FANOUT)))
public @interface MyAnonFanoutListener {
}
public class MetaListener {
@MyAnonFanoutListener
public void handle1(String foo) {
...
}
@MyAnonFanoutListener
public void handle2(String foo) {
...
}
}
在前面的示例中,由@MyAnonFanoutListener
注释绑定匿名的自动删除
排队到扇出交换,metaFanout
.
从 2.2.3 版本开始,@AliasFor
支持覆盖元注释注释上的属性。
此外,用户注释现在可以@Repeatable
,允许为一个方法创建多个容器。
@Component
static class MetaAnnotationTestBean {
@MyListener("queue1")
@MyListener("queue2")
public void handleIt(String body) {
}
}
@RabbitListener
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Repeatable(MyListeners.class)
static @interface MyListener {
@AliasFor(annotation = RabbitListener.class, attribute = "queues")
String[] value() default {};
}
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
static @interface MyListeners {
MyListener[] value();
}
启用侦听器端点注释
启用对@RabbitListener
注释,您可以添加@EnableRabbit
给你的一个@Configuration
类。
以下示例显示了如何执行此作:
@Configuration
@EnableRabbit
public class AppConfig {
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(10);
factory.setContainerCustomizer(container -> /* customize the container */);
return factory;
}
}
从 2.0 版本开始,一个DirectMessageListenerContainerFactory
也可用。
它创造了DirectMessageListenerContainer
实例。
有关帮助您在SimpleRabbitListenerContainerFactory 和DirectRabbitListenerContainerFactory ,请参阅选择容器。 |
从 2.2.2 版开始,您可以提供ContainerCustomizer
实现(如上图所示)。
这可用于在创建和配置容器后进一步配置容器;例如,您可以使用它来设置容器工厂未公开的属性。
版本 2.4.8 提供了CompositeContainerCustomizer
适用于您希望应用多个定制器的情况。
默认情况下,基础架构会查找名为rabbitListenerContainerFactory
作为工厂用于创建消息侦听器容器的源。
在这种情况下,忽略 RabbitMQ 基础设施设置,processOrder
可以使用三个线程的核心轮询大小和 10 个线程的最大池大小来调用方法。
您可以自定义要用于每个注解的监听器容器工厂,也可以通过实现RabbitListenerConfigurer
接口。
仅当注册了至少一个端点而没有特定的容器工厂时,才需要默认值。
有关完整的详细信息和示例,请参阅 Javadoc。
容器工厂提供了添加MessagePostProcessor
在接收消息后(在调用侦听器之前)和发送回复之前应用的实例。
有关回复的信息,请参阅回复管理。
从 2.0.6 版本开始,您可以添加RetryTemplate
和RecoveryCallback
到侦听器容器工厂。
在发送回复时使用它。
这RecoveryCallback
在重试用尽时调用。
您可以使用SendRetryContextAccessor
从上下文中获取信息。
以下示例显示了如何执行此作:
factory.setRetryTemplate(retryTemplate);
factory.setReplyRecoveryCallback(ctx -> {
Message failed = SendRetryContextAccessor.getMessage(ctx);
Address replyTo = SendRetryContextAccessor.getAddress(ctx);
Throwable t = ctx.getLastThrowable();
...
return null;
});
如果您更喜欢 XML 配置,可以使用<rabbit:annotation-driven>
元素。
任何用@RabbitListener
被检测到。
为SimpleRabbitListenerContainer
实例中,您可以使用类似于以下内容的 XML:
<rabbit:annotation-driven/>
<bean id="rabbitListenerContainerFactory"
class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="concurrentConsumers" value="3"/>
<property name="maxConcurrentConsumers" value="10"/>
</bean>
为DirectMessageListenerContainer
实例中,您可以使用类似于以下内容的 XML:
<rabbit:annotation-driven/>
<bean id="rabbitListenerContainerFactory"
class="org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="consumersPerQueue" value="3"/>
</bean>
从 2.0 版开始,@RabbitListener
注释有一个concurrency
财产。
它支持 SpEL 表达式 (#{…}
) 和属性占位符 (${…}
).
其含义和允许的值取决于容器类型,如下所示:
-
对于
DirectMessageListenerContainer
,则该值必须是单个整数值,该值将consumersPerQueue
容器上的属性。 -
对于
SimpleRabbitListenerContainer
,该值可以是单个整数值,该值将concurrentConsumers
属性,或者它可以具有m-n
哪里m
是concurrentConsumers
property 和n
是maxConcurrentConsumers
财产。
无论哪种情况,此设置都会覆盖出厂设置。 以前,如果您的侦听器需要不同的并发性,则必须定义不同的容器工厂。
注释还允许覆盖工厂autoStartup
和taskExecutor
属性通过autoStartup
和executor
(自 2.2 起)注释属性。
为每个执行器使用不同的执行器可能有助于识别日志和线程转储中与每个侦听器关联的线程。
2.2 版还添加了ackMode
属性,它允许您覆盖容器工厂的acknowledgeMode
财产。
@RabbitListener(id = "manual.acks.1", queues = "manual.acks.1", ackMode = "MANUAL")
public void manual1(String in, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
...
channel.basicAck(tag, false);
}
带注释的方法的消息转换
在调用侦听器之前,管道中有两个转换步骤。
第一步使用MessageConverter
转换传入的 Spring AMQPMessage
到 Spring-messagingMessage
.
调用目标方法时,如有必要,消息有效负载将转换为方法参数类型。
默认值MessageConverter
第一步是 Spring AMQPSimpleMessageConverter
处理转换为String
和java.io.Serializable
对象。
所有其他byte[]
.
在下面的讨论中,我们称之为“消息转换器”。
第二步的默认转换器是GenericMessageConverter
,它委托给转换服务
(一个实例DefaultFormattingConversionService
).
在下面的讨论中,我们将其称为“方法参数转换器”。
要更改消息转换器,您可以将其作为属性添加到容器工厂 Bean。 以下示例显示了如何执行此作:
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
...
factory.setMessageConverter(new Jackson2JsonMessageConverter());
...
return factory;
}
这将配置一个 Jackson2 转换器,该转换器需要存在标头信息来指导转换。
您还可以使用ContentTypeDelegatingMessageConverter
,可以处理不同内容类型的转换。
从 2.3 版开始,您可以通过在messageConverter
财产。
@Bean
public Jackson2JsonMessageConverter jsonConverter() {
return new Jackson2JsonMessageConverter();
}
@RabbitListener(..., messageConverter = "jsonConverter")
public void listen(String in) {
...
}
这避免了为了更改转换器而必须声明不同的容器工厂。
在大多数情况下,没有必要自定义方法参数转换器,除非,例如,您想使用
一个习惯ConversionService
.
在 1.6 之前的版本中,必须在消息头中提供转换 JSON 的类型信息,或者 习惯ClassMapper
是必需的。从 1.6 版本开始,如果没有类型信息标头,则可以从目标method 参数推断出类型。
此类型推断仅适用于@RabbitListener 在方法层面。 |
有关详细信息,请参阅 Jackson2JsonMessageConverter。
如果要自定义方法参数转换器,可以按如下方式执行:
@Configuration
@EnableRabbit
public class AppConfig implements RabbitListenerConfigurer {
...
@Bean
public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setMessageConverter(new GenericMessageConverter(myConversionService()));
return factory;
}
@Bean
public DefaultConversionService myConversionService() {
DefaultConversionService conv = new DefaultConversionService();
conv.addConverter(mySpecialConverter());
return conv;
}
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
}
...
}
对于多方法侦听器(请参阅多方法侦听器),方法选择基于消息转换后消息的有效负载。 仅在选择方法后才调用方法参数转换器。 |
添加自定义HandlerMethodArgumentResolver
@RabbitListener
从 2.3.7 版开始,您可以添加自己的HandlerMethodArgumentResolver
并解析自定义方法参数。
您所需要做的就是实现RabbitListenerConfigurer
和使用方法setCustomMethodArgumentResolvers()
从课堂上RabbitListenerEndpointRegistrar
.
@Configuration
class CustomRabbitConfig implements RabbitListenerConfigurer {
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
registrar.setCustomMethodArgumentResolvers(
new HandlerMethodArgumentResolver() {
@Override
public boolean supportsParameter(MethodParameter parameter) {
return CustomMethodArgument.class.isAssignableFrom(parameter.getParameterType());
}
@Override
public Object resolveArgument(MethodParameter parameter, org.springframework.messaging.Message<?> message) {
return new CustomMethodArgument(
(String) message.getPayload(),
message.getHeaders().get("customHeader", String.class)
);
}
}
);
}
}
编程端点注册
RabbitListenerEndpoint
提供 Rabbit 端点的模型,并负责为该模型配置容器。
该基础结构允许您以编程方式配置端点,以及由RabbitListener
注解。
以下示例显示了如何执行此作:
@Configuration
@EnableRabbit
public class AppConfig implements RabbitListenerConfigurer {
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
SimpleRabbitListenerEndpoint endpoint = new SimpleRabbitListenerEndpoint();
endpoint.setQueueNames("anotherQueue");
endpoint.setMessageListener(message -> {
// processing
});
registrar.registerEndpoint(endpoint);
}
}
在前面的示例中,我们使用SimpleRabbitListenerEndpoint
,它提供了实际的MessageListener
调用,但您也可以构建自己的端点变体来描述自定义调用机制。
应该注意的是,您也可以跳过使用@RabbitListener
并通过编程方式注册您的端点RabbitListenerConfigurer
.
带注释的端点方法签名
到目前为止,我们一直在注入一个简单的String
在我们的端点中,但它实际上可以有一个非常灵活的方法签名。
以下示例重写它以注入Order
使用自定义标头:
@Component
public class MyService {
@RabbitListener(queues = "myQueue")
public void processOrder(Order order, @Header("order_type") String orderType) {
...
}
}
以下列表显示了可用于与侦听器终结点中的参数匹配的参数:
-
原始的
org.springframework.amqp.core.Message
. -
这
MessageProperties
从原始Message
. -
这
com.rabbitmq.client.Channel
收到消息的。 -
这
org.springframework.messaging.Message
从传入的 AMQP 消息转换而来。 -
@Header
-annotated 方法参数来提取特定的标头值,包括标准 AMQP 标头。 -
@Headers
-带注释的参数,也必须可以分配给java.util.Map
用于访问所有标头。 -
转换后的有效载荷
非受支持类型之一的非注释元素(即Message
,MessageProperties
,Message<?>
和Channel
) 与有效负载匹配。
您可以通过使用@Payload
.
您还可以通过添加额外的@Valid
.
注入 Spring 消息抽象的能力特别有用,可以从存储在特定于传输的消息中的所有信息中受益,而无需依赖特定于传输的 API。 以下示例显示了如何执行此作:
@RabbitListener(queues = "myQueue")
public void processOrder(Message<Order> order) { ...
}
方法参数的处理由DefaultMessageHandlerMethodFactory
,您可以进一步自定义它以支持其他方法参数。
转换和验证支持也可以在那里定制。
例如,如果我们想确保我们的Order
在处理之前有效,我们可以用@Valid
并配置必要的验证器,如下所示:
@Configuration
@EnableRabbit
public class AppConfig implements RabbitListenerConfigurer {
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
}
@Bean
public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setValidator(myValidator());
return factory;
}
}
@RabbitListener @Payload验证
从 2.3.7 版本开始,现在可以更轻松地添加Validator
验证@RabbitListener
和@RabbitHandler
@Payload
参数。
现在,您可以简单地将验证器添加到注册商本身。
@Configuration
@EnableRabbit
public class Config implements RabbitListenerConfigurer {
...
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
registrar.setValidator(new MyValidator());
}
}
当将 Spring Boot 与验证Starters一起使用时,一个LocalValidatorFactoryBean 是自动配置的: |
@Configuration
@EnableRabbit
public class Config implements RabbitListenerConfigurer {
@Autowired
private LocalValidatorFactoryBean validator;
...
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
registrar.setValidator(this.validator);
}
}
要验证:
public static class ValidatedClass {
@Max(10)
private int bar;
public int getBar() {
return this.bar;
}
public void setBar(int bar) {
this.bar = bar;
}
}
和
@RabbitListener(id="validated", queues = "queue1", errorHandler = "validationErrorHandler",
containerFactory = "jsonListenerContainerFactory")
public void validatedListener(@Payload @Valid ValidatedClass val) {
...
}
@Bean
public RabbitListenerErrorHandler validationErrorHandler() {
return (m, e) -> {
...
};
}
监听多个队列
当您使用queues
属性,您可以指定关联的容器可以监听多个队列。
您可以使用@Header
注释,使从中接收消息的队列名称可供 POJO 使用
方法。
以下示例显示了如何执行此作:
@Component
public class MyService {
@RabbitListener(queues = { "queue1", "queue2" } )
public void processOrder(String data, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
...
}
}
从 1.5 版开始,您可以使用属性占位符和 SpEL 将队列名称外部化。 以下示例显示了如何执行此作:
@Component
public class MyService {
@RabbitListener(queues = "#{'${property.with.comma.delimited.queue.names}'.split(',')}" )
public void processOrder(String data, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
...
}
}
在 1.5 版之前,只能以这种方式指定单个队列。 每个队列都需要一个单独的属性。
回复管理
现有支持MessageListenerAdapter
已经让你的方法具有非 void 返回类型。
在这种情况下,调用的结果将封装在发送到ReplyToAddress
标头,或侦听器上配置的默认地址。
您可以使用@SendTo
消息传递抽象的注释。
假设我们的processOrder
方法现在应该返回一个OrderStatus
,我们可以写成这样,自动发送回复:
@RabbitListener(destination = "myQueue")
@SendTo("status")
public OrderStatus processOrder(Order order) {
// order processing
return status;
}
如果需要以与传输无关的方式设置其他标头,则可以返回Message
相反,类似于以下内容:
@RabbitListener(destination = "myQueue")
@SendTo("status")
public Message<OrderStatus> processOrder(Order order) {
// order processing
return MessageBuilder
.withPayload(status)
.setHeader("code", 1234)
.build();
}
或者,您可以使用MessagePostProcessor
在beforeSendReplyMessagePostProcessors
container factory 属性来添加更多标头。
从版本 2.2.3 开始,被调用的 bean/method 在回复消息中可用,它可以在消息后处理器中使用,以将信息传达给调用者:
factory.setBeforeSendReplyPostProcessors(msg -> {
msg.getMessageProperties().setHeader("calledBean",
msg.getMessageProperties().getTargetBean().getClass().getSimpleName());
msg.getMessageProperties().setHeader("calledMethod",
msg.getMessageProperties().getTargetMethod().getName());
return m;
});
从 2.2.5 版开始,您可以配置ReplyPostProcessor
在发送回复消息之前对其进行修改;它以correlationId
标头已设置为匹配请求。
@RabbitListener(queues = "test.header", group = "testGroup", replyPostProcessor = "echoCustomHeader")
public String capitalizeWithHeader(String in) {
return in.toUpperCase();
}
@Bean
public ReplyPostProcessor echoCustomHeader() {
return (req, resp) -> {
resp.getMessageProperties().setHeader("myHeader", req.getMessageProperties().getHeader("myHeader"));
return resp;
};
}
从 3.0 版开始,您可以在容器工厂而不是注释上配置后处理器。
factory.setReplyPostProcessorProvider(id -> (req, resp) -> {
resp.getMessageProperties().setHeader("myHeader", req.getMessageProperties().getHeader("myHeader"));
return resp;
});
这id
参数是监听器 ID。
注释上的设置将取代出厂设置。
这@SendTo
value 假定为回复exchange
和routingKey
对后面的exchange/routingKey
模式
其中一个部分可以省略。
有效值如下:
-
thing1/thing2
:这replyTo
exchange 和routingKey
.thing1/
:这replyTo
交换和默认值(空)routingKey
.thing2
或/thing2
:这replyTo
routingKey
和默认(空)交易所。 或空:/
replyTo
默认交换和默认routingKey
.
此外,您还可以使用@SendTo
没有value
属性。
这种情况等于空的sendTo
模式。@SendTo
仅当入站消息没有replyToAddress
财产。
从 1.5 版开始,@SendTo
value 可以是 bean 初始化 SpEL 表达式,如以下示例所示:
@RabbitListener(queues = "test.sendTo.spel")
@SendTo("#{spelReplyTo}")
public String capitalizeWithSendToSpel(String foo) {
return foo.toUpperCase();
}
...
@Bean
public String spelReplyTo() {
return "test.sendTo.reply.spel";
}
表达式的计算结果必须为String
,它可以是简单的队列名称(发送到默认交换)或使用
形式exchange/routingKey
如前面示例之前所述。
这#{…} expression 在初始化期间计算一次。 |
对于动态回复路由,邮件发送方应包含reply_to
message 属性或使用 alternate
runtime SpEL 表达式(在下一个示例之后描述)。
从 1.6 版本开始,@SendTo
可以是在运行时针对请求进行评估的 SpEL 表达式
并回复,如以下示例所示:
@RabbitListener(queues = "test.sendTo.spel")
@SendTo("!{'some.reply.queue.with.' + result.queueName}")
public Bar capitalizeWithSendToSpel(Foo foo) {
return processTheFooAndReturnABar(foo);
}
SpEL 表达式的运行时性质用!{…}
分隔符。
评估上下文#root
object 具有三个属性:
-
request
:这o.s.amqp.core.Message
request 对象。 -
source
:这o.s.messaging.Message<?>
转换后。 -
result
:方法结果。
上下文有一个映射属性访问器、一个标准类型转换器和一个 bean 解析器,它允许
上下文(例如,@someBeanName.determineReplyQ(request, result)
).
综上所述,#{…}
在初始化期间评估一次,使用#root
object 是应用程序上下文。
豆子由其名称引用。!{…}
在运行时为每条消息进行评估,根对象具有前面列出的属性。
Bean 用它们的名称引用,前缀为 .@
从 2.1 版开始,还支持简单的属性占位符(例如${some.reply.to}
).
对于早期版本,可以使用以下方法作为解决方法,如以下示例所示:
@RabbitListener(queues = "foo")
@SendTo("#{environment['my.send.to']}")
public String listen(Message in) {
...
return ...
}
回复 ContentType
如果您使用的是复杂的消息转换器,例如ContentTypeDelegatingMessageConverter
,您可以通过设置replyContentType
属性。
这允许转换器为回复选择适当的委托转换器。
@RabbitListener(queues = "q1", messageConverter = "delegating",
replyContentType = "application/json")
public Thing2 listen(Thing1 in) {
...
}
默认情况下,为了向后兼容,转换器设置的任何内容类型属性在转换后都将被此值覆盖。
转换器,例如SimpleMessageConverter
使用回复类型而不是内容类型来确定所需的转换,并在回复消息中适当设置内容类型。
这可能不是所需的作,可以通过设置converterWinsContentType
属性设置为false
.
例如,如果您返回String
包含 JSON,则SimpleMessageConverter
会将回复中的内容类型设置为text/plain
.
以下配置将确保内容类型设置正确,即使SimpleMessageConverter
被使用。
@RabbitListener(queues = "q1", replyContentType = "application/json",
converterWinsContentType = "false")
public String listen(Thing in) {
...
return someJsonString;
}
这些属性 (replyContentType
和converterWinsContentType
) 不适用当返回类型为 Spring AMQP 时Message
或 Spring MessagingMessage<?>
.
在第一种情况下,不涉及转换;只需将contentType
message 属性。
在第二种情况下,使用消息头控制行为:
@RabbitListener(queues = "q1", messageConverter = "delegating")
@SendTo("q2")
public Message<String> listen(String in) {
...
return MessageBuilder.withPayload(in.toUpperCase())
.setHeader(MessageHeaders.CONTENT_TYPE, "application/xml")
.build();
}
此内容类型将在MessageProperties
到转换器。
默认情况下,为了向后兼容,转换器设置的任何内容类型属性在转换后都将被此值覆盖。
如果您希望覆盖该行为,请同时将AmqpHeaders.CONTENT_TYPE_CONVERTER_WINS
自true
转换器设置的任何值都将被保留。
多方法侦听器
从 1.5.0 版开始,您可以指定@RabbitListener
类级别的注释。
与新的@RabbitHandler
注释,这允许单个侦听器调用不同的方法,基于
传入消息的有效负载类型。
最好用一个例子来描述这一点:
@RabbitListener(id="multi", queues = "someQueue")
@SendTo("my.reply.queue")
public class MultiListenerBean {
@RabbitHandler
public String thing2(Thing2 thing2) {
...
}
@RabbitHandler
public String cat(Cat cat) {
...
}
@RabbitHandler
public String hat(@Header("amqp_receivedRoutingKey") String rk, @Payload Hat hat) {
...
}
@RabbitHandler(isDefault = true)
public String defaultMethod(Object object) {
...
}
}
在这种情况下,个人@RabbitHandler
如果转换后的有效负载是Thing2
一个Cat
或Hat
.
您应该了解,系统必须能够根据有效负载类型识别唯一的方法。
检查类型是否可分配给没有注释或使用@Payload
注解。
请注意,相同的方法签名适用,如方法级@RabbitListener
(前面描述)。
从 2.0.3 版开始,@RabbitHandler
method 可以指定为默认方法,如果其他方法没有匹配项,则调用该方法。
最多只能指定一种方法。
@RabbitHandler 仅用于在转换后处理消息有效负载,如果您希望接收未转换的原始数据Message 对象,您必须使用@RabbitListener 在方法上,而不是类上。 |
@Repeatable
@RabbitListener
从 1.6 版本开始,@RabbitListener
注释标记为@Repeatable
.
这意味着注释可以多次出现在同一个带注释的元素(方法或类)上。
在这种情况下,将为每个注解创建一个单独的侦听器容器,每个注解都调用相同的侦听器@Bean
.
可重复的注释可以与 Java 8 或更高版本一起使用。
代理@RabbitListener
和泛型
如果您的服务旨在代理(例如,在@Transactional
),您应该记住一些注意事项
该接口具有通用参数。
请考虑以下示例:
interface TxService<P> {
String handle(P payload, String header);
}
static class TxServiceImpl implements TxService<Foo> {
@Override
@RabbitListener(...)
public String handle(Thing thing, String rk) {
...
}
}
使用通用接口和特定实现时,您被迫切换到 CGLIB 目标类代理,因为该接口的实际实现handle
方法是一种桥接方法。
在事务管理的情况下,CGLIB 的使用是通过使用
注释选项:@EnableTransactionManagement(proxyTargetClass = true)
.
在这种情况下,必须在实现中的目标方法上声明所有注释,如以下示例所示:
static class TxServiceImpl implements TxService<Foo> {
@Override
@Transactional
@RabbitListener(...)
public String handle(@Payload Foo foo, @Header("amqp_receivedRoutingKey") String rk) {
...
}
}
处理异常
默认情况下,如果带注释的侦听器方法抛出异常,则会将其抛出到容器中,并且消息将重新排队并重新传递、丢弃或路由到死信交换,具体取决于容器和代理配置。 不会向发件人返回任何内容。
从 2.0 版开始,@RabbitListener
注释有两个新属性:errorHandler
和returnExceptions
.
默认情况下,这些选项未配置。
您可以使用errorHandler
提供RabbitListenerErrorHandler
实现。 该函数接口有一个方法,如下所示:
@FunctionalInterface
public interface RabbitListenerErrorHandler {
Object handleError(Message amqpMessage, org.springframework.messaging.Message<?> message,
ListenerExecutionFailedException exception) throws Exception;
}
如您所见,您可以访问从容器 spring-messaging 接收的原始消息Message<?>
消息转换器生成的对象,以及侦听器抛出的异常(包装在ListenerExecutionFailedException
). 错误处理程序可以返回一些结果(作为回复发送)或抛出原始异常或新异常(抛出到容器或返回给发送方,具体取决于returnExceptions
设置)。
这returnExceptions
属性,当true
,导致异常返回给发送方。
异常包装在RemoteInvocationResult
对象。
在发送方方面,有一个可用的RemoteInvocationAwareMessageConverterAdapter
,如果配置为RabbitTemplate
,重新抛出服务器端异常,包装在AmqpRemoteException
.
服务器异常的堆栈跟踪是通过合并服务器和客户端堆栈跟踪来合成的。
此机制通常仅适用于默认的SimpleMessageConverter ,它使用 Java 序列化。
异常通常不是“Jackson友好的”,不能序列化为 JSON。
如果您使用 JSON,请考虑使用errorHandler 返回其他一些对Jackson友好的Error 对象。 |
在 2.1 版本中,此接口从 packageo.s.amqp.rabbit.listener 自o.s.amqp.rabbit.listener.api . |
从 2.1.7 版本开始,Channel
在消息传递消息头中可用;这允许您在使用AcknowledgeMode.MANUAL
:
public Object handleError(Message amqpMessage, org.springframework.messaging.Message<?> message,
ListenerExecutionFailedException exception) {
...
message.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class)
.basicReject(message.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class),
true);
}
从 2.2.18 版本开始,如果抛出消息转换异常,将调用错误处理程序,并null
在message
论点。
这允许应用程序向调用方发送一些结果,指示收到格式错误的消息。
以前,此类错误是由容器抛出和处理的。
容器管理
为注释创建的容器不会在应用程序上下文中注册。
您可以通过调用getListenerContainers()
在RabbitListenerEndpointRegistry
豆。
然后,您可以循环访问此集合,例如,停止或启动所有容器或调用Lifecycle
方法
在注册表本身上,这将调用每个容器上的作。
您还可以使用其id
用getListenerContainer(String id)
— 用于
例registry.getListenerContainer("multi")
对于上面代码段创建的容器。
从 1.5.2 版本开始,您可以获得id
已注册容器的值与getListenerContainerIds()
.
从 1.5 版开始,您现在可以分配一个group
到RabbitListener
端点。
这提供了一种机制来获取对容器子集的引用。
添加一个group
属性导致类型为Collection<MessageListenerContainer>
以组名称向上下文注册。
默认情况下,停止容器将取消使用者并在停止之前处理所有预提取的消息。 从版本 2.4.14、3.0.6 开始,可以将 [forceStop] 容器属性设置为 true,以便在处理当前消息后立即停止,从而导致任何预提取的消息重新排队。 例如,如果使用独占或单活动使用者,这很有用。
@RabbitListener批处理
当收到一批消息时,通常由容器执行取消批处理,并且一次调用一个消息。
从 2.2 版本开始,您可以将侦听器容器工厂和侦听器配置为在一次调用中接收整个批次,只需将工厂的batchListener
属性,并将方法有效负载参数设置为List
或Collection
:
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setBatchListener(true);
return factory;
}
@RabbitListener(queues = "batch.1")
public void listen1(List<Thing> in) {
...
}
// or
@RabbitListener(queues = "batch.2")
public void listen2(List<Message<Thing>> in) {
...
}
设置batchListener
属性设置为 true 会自动关闭deBatchingEnabled
container 属性(除非consumerBatchEnabled
是true
- 见下文)。实际上,取消批处理从容器移动到侦听器适配器,适配器创建传递给侦听器的列表。
启用批处理的工厂不能与多方法侦听器一起使用。
同样从 2.2 版本开始。一次接收一条批处理消息时,最后一条消息包含设置为true
.
可以通过添加@Header(AmqpHeaders.LAST_IN_BATCH)
boolean last' 参数添加到您的侦听器方法。
标头映射自MessageProperties.isLastInBatch()
.
另外AmqpHeaders.BATCH_SIZE
填充了每个消息片段中的批处理大小。
此外,还有一个新房产consumerBatchEnabled
已添加到SimpleMessageListenerContainer
.
当此值为 true 时,容器将创建一批消息,最多batchSize
;如果出现以下情况,则交付部分批次receiveTimeout
已过,没有新消息到达。
如果收到生产者创建的批次,则将其取消批处理并添加到使用者端批次中;因此,实际传递的消息数可能会超过batchSize
,表示从代理接收的消息数。deBatchingEnabled
当consumerBatchEnabled
是真的;集装箱工厂将强制执行此要求。
@Bean
public SimpleRabbitListenerContainerFactory consumerBatchContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(rabbitConnectionFactory());
factory.setConsumerTagStrategy(consumerTagStrategy());
factory.setBatchListener(true); // configures a BatchMessageListenerAdapter
factory.setBatchSize(2);
factory.setConsumerBatchEnabled(true);
return factory;
}
使用时consumerBatchEnabled
跟@RabbitListener
:
@RabbitListener(queues = "batch.1", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch1(List<Message> amqpMessages) {
...
}
@RabbitListener(queues = "batch.2", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch2(List<org.springframework.messaging.Message<Invoice>> messages) {
...
}
@RabbitListener(queues = "batch.3", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch3(List<Invoice> strings) {
...
}
-
第一个是用原始的、未转换的
org.springframework.amqp.core.Message
s 收到。 -
第二个是使用
org.springframework.messaging.Message<?>
s 具有转换后的有效负载和映射的标头/属性。 -
第三个是使用转换后的有效负载调用的,无法访问标头/属性。
您还可以添加Channel
参数,在使用MANUAL
ack 模式。
这对于第三个示例不是很有用,因为您无权访问delivery_tag
财产。
Spring Boot 提供了一个配置属性consumerBatchEnabled
和batchSize
,但不是batchListener
.
从 3.0 版开始,将consumerBatchEnabled
自true
在集装箱工厂也设置batchListener
自true
.
什么时候consumerBatchEnabled
是true
,则侦听器必须是批处理侦听器。
从 3.0 版开始,侦听器方法可以使用Collection<?>
或List<?>
.
使用容器工厂
引入了侦听器容器工厂来支持@RabbitListener
并向RabbitListenerEndpointRegistry
,如编程终结点注册中所述。
从 2.1 版开始,它们可用于创建任何侦听器容器——甚至是没有侦听器的容器(例如在 Spring Integration 中使用)。 当然,在容器启动之前必须添加侦听器。
有两种方法可以创建此类容器:
-
使用 SimpleRabbitListenerEndpoint
-
创建后添加监听器
以下示例显示如何使用SimpleRabbitListenerEndpoint
要创建侦听器容器:
@Bean
public SimpleMessageListenerContainer factoryCreatedContainerSimpleListener(
SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory) {
SimpleRabbitListenerEndpoint endpoint = new SimpleRabbitListenerEndpoint();
endpoint.setQueueNames("queue.1");
endpoint.setMessageListener(message -> {
...
});
return rabbitListenerContainerFactory.createListenerContainer(endpoint);
}
以下示例展示了如何在创建后添加监听器:
@Bean
public SimpleMessageListenerContainer factoryCreatedContainerNoListener(
SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory) {
SimpleMessageListenerContainer container = rabbitListenerContainerFactory.createListenerContainer();
container.setMessageListener(message -> {
...
});
container.setQueueNames("test.no.listener.yet");
return container;
}
无论哪种情况,监听器也可以是ChannelAwareMessageListener
,因为它现在是MessageListener
.
如果您希望创建多个具有相似属性的容器或使用预配置的容器工厂(例如 Spring Boot 自动配置或两者提供的容器工厂),这些技术非常有用。
以这种方式创建的容器是正常的@Bean 实例,并且未在RabbitListenerEndpointRegistry . |
异步@RabbitListener
返回类型
@RabbitListener
(和@RabbitHandler
) 方法可以使用异步返回类型指定CompletableFuture<?>
和Mono<?>
,让异步发送回复。ListenableFuture<?>
不再受支持;它已被 Spring Framework 弃用。
侦听器容器工厂必须配置AcknowledgeMode.MANUAL 这样消费者线程就不会确认消息;相反,异步完成将在异步作完成时确认或 nack 消息。
当异步结果完成并出现错误时,消息是否重新排队取决于抛出的异常类型、容器配置和容器错误处理程序。
默认情况下,消息将重新排队,除非容器的defaultRequeueRejected 属性设置为false (这是true 默认情况下)。
如果异步结果使用AmqpRejectAndDontRequeueException ,则不会将消息重新排队。
如果容器的defaultRequeueRejected 属性是false ,您可以通过将 future 的异常设置为ImmediateRequeueException 并且消息将被重新排队。
如果侦听器方法中发生某些异常,导致创建异步结果对象,则必须捕获该异常并返回适当的返回对象,该对象将导致消息被确认或重新排队。 |
从版本 2.2.21、2.3.13、2.4.1 开始,AcknowledgeMode
将自动将MANUAL
检测到异步返回类型时。
此外,具有致命例外的传入消息将被单独否定确认,以前任何先前未确认的消息也会被否定确认。
从 3.0.5 版开始,@RabbitListener
(和@RabbitHandler
) 方法可以用 Kotlin 标记suspend
整个处理过程和回复生成(可选)发生在各自的 Kotlin 协程上。
所有提到的规则AcknowledgeMode.MANUAL
仍然适用。
这org.jetbrains.kotlinx:kotlinx-coroutines-reactor
类路径中必须存在依赖项才能允许suspend
函数调用。
同样从版本 3.0.5 开始,如果RabbitListenerErrorHandler
在具有异步返回类型(包括 Kotlin 挂起函数)的监听器上配置,则在失败后调用错误处理程序。
有关此错误处理程序及其用途的更多信息,请参阅处理异常。
线程和异步使用者
异步使用者涉及许多不同的线程。
来自TaskExecutor
在SimpleMessageListenerContainer
用于调用MessageListener
当新消息由RabbitMQ Client
.
如果未配置,则SimpleAsyncTaskExecutor
被使用。
如果使用池执行器,则需要确保池大小足以处理配置的并发。
使用DirectMessageListenerContainer
这MessageListener
直接在RabbitMQ Client
线。
在这种情况下,taskExecutor
用于监视使用者的任务。
使用默认值SimpleAsyncTaskExecutor ,对于调用侦听器的线程,侦听器容器beanName 用于threadNamePrefix .
这对于日志分析很有用。
我们通常建议始终在日志记录附加程序配置中包含线程名称。
当TaskExecutor 通过taskExecutor 属性,则按原样使用,无需修改。
建议使用类似的技术来命名自定义TaskExecutor bean 定义,以帮助日志消息中的线程识别。 |
这Executor
在CachingConnectionFactory
被传递到RabbitMQ Client
创建连接时,其线程用于将新消息传递到侦听器容器。
如果未配置此作,则客户端将使用内部线程池执行器,其池大小(在编写时)为Runtime.getRuntime().availableProcessors() * 2
对于每个连接。
如果您有大量工厂或正在使用CacheMode.CONNECTION
,不妨考虑使用共享的ThreadPoolTaskExecutor
有足够的线程来满足您的工作负载。
使用DirectMessageListenerContainer ,您需要确保连接工厂配置了一个任务执行器,该执行器具有足够的线程,以支持使用该工厂的所有侦听器容器之间的所需并发性。
默认池大小(在撰写本文时)为Runtime.getRuntime().availableProcessors() * 2 . |
这RabbitMQ client
使用ThreadFactory
为低级 I/O(套接字)作创建线程。
要修改此工厂,您需要配置底层 RabbitMQConnectionFactory
,如配置基础客户端连接工厂中所述。
选择容器
2.0 版引入了DirectMessageListenerContainer
(DMLC)。
以前,只有SimpleMessageListenerContainer
(SMLC) 可用。
SMLC 为每个使用者使用内部队列和专用线程。
如果将容器配置为侦听多个队列,则使用同一个使用者线程来处理所有队列。
并发由concurrentConsumers
和其他属性。
当消息从 RabbitMQ 客户端到达时,客户端线程通过队列将它们移交给使用者线程。
需要这种架构,因为在早期版本的 RabbitMQ 客户端中,无法进行多个并发交付。
较新版本的客户端具有修订的线程模型,现在可以支持并发。
这允许引入 DMLC,其中侦听器现在直接在 RabbitMQ 客户端线程上调用。
因此,它的架构实际上比 SMLC “更简单”。
但是,这种方法存在一些局限性,并且 SMLC 的某些功能不适用于 DMLC。
此外,并发由consumersPerQueue
(以及客户端库的线程池)。 这concurrentConsumers
和关联的属性不适用于此容器。
SMLC 提供以下功能,但 DMLC 不提供:
-
batchSize
:使用 SMLC,您可以设置此设置以控制事务中传递的消息数或减少 ack 数,但这可能会导致失败后重复传递的数量增加。(DMLC 确实有messagesPerAck
,您可以使用它来减少 acks,与batchSize
和 SMLC,但它不能与事务一起使用——每条消息都在单独的事务中传递和确认)。 -
consumerBatchEnabled
:启用消费者中离散消息的批处理;有关更多信息,请参阅消息侦听器容器配置。 -
maxConcurrentConsumers
和消费者缩放间隔或触发器——DMLC 中没有自动缩放。但是,它确实允许您以编程方式更改consumersPerQueue
财产和消费者都相应地进行了调整。
但是,与 SMLC 相比,DMLC 具有以下优势:
-
在运行时添加和删除队列效率更高。使用 SMLC,将重新启动整个使用者线程(取消并重新创建所有使用者)。使用 DMLC,不会取消未受影响的使用者。
-
避免了 RabbitMQ 客户端线程和消费者线程之间的上下文切换。
-
线程在使用者之间共享,而不是为 SMLC 中的每个使用者提供专用线程。但是,请参阅线程和异步使用者中有关连接工厂配置的重要说明。
有关哪些配置属性适用于每个容器的信息,请参阅消息侦听器容器配置。
检测空闲异步使用者
虽然高效,但异步消费者的一个问题是检测它们何时空闲——用户可能希望 如果一段时间内没有消息到达,则执行一些作。
从 1.6 版开始,现在可以配置侦听器容器以发布ListenerContainerIdleEvent
当一段时间过去没有消息传递时。
当容器处于空余状态时,每隔一次就会发布一个事件idleEventInterval
毫秒。
要配置此功能,请将idleEventInterval
在容器上。
以下示例显示了如何在 XML 和 Java 中执行此作(对于SimpleMessageListenerContainer
和SimpleRabbitListenerContainerFactory
):
<rabbit:listener-container connection-factory="connectionFactory"
...
idle-event-interval="60000"
...
>
<rabbit:listener id="container1" queue-names="foo" ref="myListener" method="handle" />
</rabbit:listener-container>
@Bean
public SimpleMessageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
...
container.setIdleEventInterval(60000L);
...
return container;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(rabbitConnectionFactory());
factory.setIdleEventInterval(60000L);
...
return factory;
}
在每种情况下,当容器处于空闲状态时,每分钟发布一次事件。
事件消耗
您可以通过实现ApplicationListener
——要么是普通听众,要么是缩小到仅听众
接收此特定事件。
您还可以使用@EventListener
,在 Spring Framework 4.2 中引入。
以下示例将@RabbitListener
和@EventListener
到单个类中。
您需要了解应用程序侦听器获取所有容器的事件,因此您可能需要
如果要根据哪个容器处于空闲状态执行特定作,请检查侦听器 ID。
您还可以使用@EventListener
condition
为此目的。
事件有四个属性:
-
source
:侦听器容器实例 -
id
:侦听器 ID(或容器 Bean 名称) -
idleTime
:发布事件时容器处于空闲状态的时间 -
queueNames
:容器侦听的队列的名称
以下示例演示了如何使用@RabbitListener
和@EventListener
附注:
public class Listener {
@RabbitListener(id="someId", queues="#{queue.name}")
public String listen(String foo) {
return foo.toUpperCase();
}
@EventListener(condition = "event.listenerId == 'someId'")
public void onApplicationEvent(ListenerContainerIdleEvent event) {
...
}
}
事件侦听器可以看到所有容器的事件。 因此,在前面的示例中,我们根据侦听器 ID 缩小了接收到的事件范围。 |
如果您希望使用 idle 事件来停止列表器容器,则不应调用container.stop() 在调用侦听器的线程上。
这样做总是会导致延迟和不必要的日志消息。
相反,您应该将事件移交给可以停止容器的其他线程。 |
监控侦听器性能
从 2.2 版开始,侦听器容器将自动创建和更新 MicrometerTimer
s 表示侦听器,如果Micrometer
在类路径上检测到单个MeterRegistry
存在于应用程序上下文中(或者恰好有一个被注释@Primary
,例如在使用 Spring Boot 时)。
可以通过设置容器属性来禁用计时器micrometerEnabled
自false
.
维护两个计时器 - 一个用于成功调用侦听器,一个用于失败。
通过简单的MessageListener
,每个配置的队列都有一对计时器。
计时器被命名为spring.rabbitmq.listener
并具有以下标签:
-
listenerId
:(侦听器 ID 或容器 Bean 名称) -
queue
:(简单侦听器的队列名称或配置的队列名称列表,当consumerBatchEnabled
是true
- 因为批处理可能包含来自多个队列的消息) -
result
:success
或failure
-
exception
:none
或ListenerExecutionFailedException
您可以使用micrometerTags
container 属性。
另见千分尺观察。
千分尺观察
从 3.0 版开始,现在支持使用千分尺进行观察,用于RabbitTemplate
和侦听器容器。
设置observationEnabled
在每个组件上进行观察;这将禁用千分尺计时器,因为计时器现在将通过每次观测进行管理。
使用带注释的监听器时,将observationEnabled
在集装箱工厂。
有关更多信息,请参阅千分尺追踪。
要向计时器/跟踪添加标记,请配置自定义RabbitTemplateObservationConvention
或RabbitListenerObservationConvention
分别添加到模板或侦听器容器。
默认实现将name
标签用于模板观察,以及listener.id
标签。
您可以将子类化DefaultRabbitTemplateObservationConvention
或DefaultRabbitListenerObservationConvention
或提供全新的实现。
有关更多详细信息,请参阅千分尺观察文档。
4.1.7. 容器和代理命名队列
虽然最好使用AnonymousQueue
实例作为自动删除队列,从 2.1 版本开始,您可以将 Broker 命名队列与监听器容器一起使用。
以下示例显示了如何执行此作:
@Bean
public Queue queue() {
return new Queue("", false, true, true);
}
@Bean
public SimpleMessageListenerContainer container() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf());
container.setQueues(queue());
container.setMessageListener(m -> {
...
});
container.setMissingQueuesFatal(false);
return container;
}
注意空的String
为名称。
当RabbitAdmin
声明队列,它会更新Queue.actualName
属性,其名称由代理返回。
您必须使用setQueues()
当您将容器配置为使其正常工作时,以便容器可以在运行时访问声明的名称。
仅仅设置名称是不够的。
您无法在容器运行时将代理命名的队列添加到容器中。 |
重置连接并建立新连接时,新队列将获得新名称。
由于容器重新启动和重新声明队列之间存在竞争条件,因此将容器的missingQueuesFatal 属性设置为false ,因为容器最初可能会尝试重新连接到旧队列。 |
4.1.8. 消息转换器
这AmqpTemplate
还定义了几种用于发送和接收委托给MessageConverter
.
这MessageConverter
为每个方向提供一个方法:一个用于转换为Message
另一个用于从Message
.
请注意,当转换为Message
,除了对象之外,您还可以提供属性。
这object
参数通常对应于 Message 正文。
以下列表显示了MessageConverter
接口定义:
public interface MessageConverter {
Message toMessage(Object object, MessageProperties messageProperties)
throws MessageConversionException;
Object fromMessage(Message message) throws MessageConversionException;
}
相关Message
-sending 方法AmqpTemplate
比我们之前讨论的方法更简单,因为它们不需要Message
实例。
相反,该MessageConverter
负责“创造”每个Message
通过将提供的对象转换为Message
body ,然后添加任何提供的MessageProperties
.
以下列表显示了各种方法的定义:
void convertAndSend(Object message) throws AmqpException;
void convertAndSend(String routingKey, Object message) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message)
throws AmqpException;
void convertAndSend(Object message, MessagePostProcessor messagePostProcessor)
throws AmqpException;
void convertAndSend(String routingKey, Object message,
MessagePostProcessor messagePostProcessor) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message,
MessagePostProcessor messagePostProcessor) throws AmqpException;
在接收端,只有两种方法:一种接受队列名称,另一种依赖于模板的“queue”属性已设置。 以下列表显示了这两种方法的定义:
Object receiveAndConvert() throws AmqpException;
Object receiveAndConvert(String queueName) throws AmqpException;
这MessageListenerAdapter 中提到的异步消费者也使用MessageConverter . |
SimpleMessageConverter
默认实现的MessageConverter
策略称为SimpleMessageConverter
.
这是RabbitTemplate
如果未显式配置备选方案。
它处理基于文本的内容、序列化的 Java 对象和字节数组。
从Message
如果输入的内容类型Message
以“text”开头(例如,
“text/plain”),它还检查 content-encoding 属性以确定在转换Message
body 字节数组转换为 JavaString
.
如果未在输入上设置内容编码属性Message
,它默认使用 UTF-8 字符集。
如果需要覆盖该默认设置,可以配置SimpleMessageConverter
,设置其defaultCharset
属性,并将其注入到RabbitTemplate
实例。
如果输入的 content-type 属性值Message
设置为“application/x-java-serialized-object”,则SimpleMessageConverter
尝试将字节数组反序列化(再冻结)为 Java 对象。
虽然这对于简单的原型设计可能很有用,但我们不建议依赖 Java 序列化,因为它会导致生产者和消费者之间的紧密耦合。
当然,它也排除了在任何一方使用非 Java 系统的可能性。
由于 AMQP 是一种线级协议,因此由于此类限制而失去大部分优势将是不幸的。
在接下来的两节中,我们将探讨一些在不依赖 Java 序列化的情况下传递丰富领域对象内容的替代方法。
对于所有其他内容类型,SimpleMessageConverter
返回Message
正文内容直接作为字节数组。
有关重要信息,请参阅 Java 反序列化。
SerializerMessageConverter
此转换器类似于SimpleMessageConverter
除了它可以与其他 Spring Framework 一起配置Serializer
和Deserializer
实现application/x-java-serialized-object
转换。
有关重要信息,请参阅 Java 反序列化。
Jackson2JsonMessageConverter
本节介绍使用Jackson2JsonMessageConverter
与Message
. 它有以下部分:
转换为Message
如上一节所述,通常不建议依赖 Java 序列化。
JSON 是一种相当常见的替代方案,它更灵活、更易于跨不同语言和平台移植
(JavaScript 对象表示法)。
转换器可以配置在任何RabbitTemplate
实例来覆盖其对SimpleMessageConverter
违约。
这Jackson2JsonMessageConverter
使用com.fasterxml.jackson
2.x 库。
以下示例配置Jackson2JsonMessageConverter
:
<bean class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<property name="connectionFactory" ref="rabbitConnectionFactory"/>
<property name="messageConverter">
<bean class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter">
<!-- if necessary, override the DefaultClassMapper -->
<property name="classMapper" ref="customClassMapper"/>
</bean>
</property>
</bean>
如上图所示,Jackson2JsonMessageConverter
使用DefaultClassMapper
默认情况下。
类型信息被添加到(并从中检索)MessageProperties
.
如果入站消息不包含MessageProperties
,但你知道预期的类型,你
可以使用defaultType
属性,如以下示例所示:
<bean id="jsonConverterWithDefaultType"
class="o.s.amqp.support.converter.Jackson2JsonMessageConverter">
<property name="classMapper">
<bean class="org.springframework.amqp.support.converter.DefaultClassMapper">
<property name="defaultType" value="thing1.PurchaseOrder"/>
</bean>
</property>
</bean>
此外,您可以从TypeId
页眉。
以下示例显示了如何执行此作:
@Bean
public Jackson2JsonMessageConverter jsonMessageConverter() {
Jackson2JsonMessageConverter jsonConverter = new Jackson2JsonMessageConverter();
jsonConverter.setClassMapper(classMapper());
return jsonConverter;
}
@Bean
public DefaultClassMapper classMapper() {
DefaultClassMapper classMapper = new DefaultClassMapper();
Map<String, Class<?>> idClassMapping = new HashMap<>();
idClassMapping.put("thing1", Thing1.class);
idClassMapping.put("thing2", Thing2.class);
classMapper.setIdClassMapping(idClassMapping);
return classMapper;
}
现在,如果发送系统将标头设置为thing1
,转换器会创建一个Thing1
对象,依此类推。
有关从非 Spring 应用程序转换消息的完整讨论,请参阅从非 Spring 应用程序接收 JSON 示例应用程序。
从 2.4.3 版开始,转换器将不会添加contentEncoding
message 属性,如果supportedMediaType
有一个charset
参数;这也用于编码。
一种新方法setSupportedMediaType
已添加:
String utf16 = "application/json; charset=utf-16";
converter.setSupportedContentType(MimeTypeUtils.parseMimeType(utf16));
从Message
入站消息根据发送系统添加到标头的类型信息转换为对象。
从 2.4.3 版本开始,如果没有contentEncoding
message 属性,转换器将尝试检测charset
参数在contentType
message 属性并使用它。
如果两者都不存在,如果supportedMediaType
有一个charset
参数,它将用于解码,并最终回退到defaultCharset
财产。
一种新方法setSupportedMediaType
已添加:
String utf16 = "application/json; charset=utf-16";
converter.setSupportedContentType(MimeTypeUtils.parseMimeType(utf16));
在 1.6 之前的版本中,如果类型信息不存在,则转换将失败。 从 1.6 版开始,如果缺少类型信息,转换器将使用 Jackson 默认值(通常是映射)转换 JSON。
此外,从 1.6 版开始,当您使用@RabbitListener
注释(在方法上),推断的类型信息将添加到MessageProperties
.
这允许转换器转换为目标方法的参数类型。
仅当有一个参数没有注释或单个参数具有@Payload
注解。
类型参数Message
在分析过程中被忽略。
默认情况下,推断的类型信息将覆盖入站TypeId 和创建的相关标头
由发送系统。
这允许接收系统自动转换为不同的域对象。
仅当参数类型是具体的(不是抽象或接口)或来自java.util 包。
在所有其他情况下,TypeId 和相关的标头。
在某些情况下,您可能希望覆盖默认行为并始终使用TypeId 信息。
例如,假设您有一个@RabbitListener 这需要一个Thing1 参数,但消息包含Thing2 那
是Thing1 (这是具体的)。
推断的类型将不正确。
要处理这种情况,请将TypePrecedence 属性Jackson2JsonMessageConverter 自TYPE_ID 相反
的默认值INFERRED .
(该属性实际上位于转换器的DefaultJackson2JavaTypeMapper ,但转换器上提供了一个 setter
为方便起见。
如果注入自定义类型映射器,则应改为在映射器上设置属性。 |
从Message ,传入MessageProperties.getContentType() 必须符合 JSON (contentType.contains("json") 用于检查)。
从 2.2 版本开始,application/json 如果没有contentType 属性,或者它具有默认值application/octet-stream .
要恢复到以前的行为(返回未转换的byte[] ),将转换器的assumeSupportedContentType 属性设置为false .
如果不支持内容类型,则WARN 日志消息Could not convert incoming message with content-type […] ,发出,并且message.getBody() 按原样返回 — 作为byte[] .
因此,为了满足Jackson2JsonMessageConverter 要求,生产者必须添加contentType message 属性 — 例如,作为application/json 或text/x-json 或使用Jackson2JsonMessageConverter ,它会自动设置标题。
以下列表显示了许多转换器调用: |
@RabbitListener
public void thing1(Thing1 thing1) {...}
@RabbitListener
public void thing1(@Payload Thing1 thing1, @Header("amqp_consumerQueue") String queue) {...}
@RabbitListener
public void thing1(Thing1 thing1, o.s.amqp.core.Message message) {...}
@RabbitListener
public void thing1(Thing1 thing1, o.s.messaging.Message<Foo> message) {...}
@RabbitListener
public void thing1(Thing1 thing1, String bar) {...}
@RabbitListener
public void thing1(Thing1 thing1, o.s.messaging.Message<?> message) {...}
在前面列表中的前四种情况下,转换器会尝试转换为Thing1
类型。
第五个示例无效,因为我们无法确定哪个参数应该接收消息有效负载。
在第六个示例中,Jackson 默认值适用,因为泛型类型是WildcardType
.
但是,您可以创建一个自定义转换器并使用targetMethod
message 属性来决定要转换的类型
JSON 到。
只有当@RabbitListener 注释是在方法级别声明的。
与类级@RabbitListener ,转换后的类型用于选择哪个@RabbitHandler 方法调用。
因此,基础架构提供了targetObject message 属性,您可以在自定义
转换器来确定类型。 |
从 1.6.11 版本开始,Jackson2JsonMessageConverter 因此,DefaultJackson2JavaTypeMapper (DefaultClassMapper ) 提供trustedPackages 克服序列化小工具漏洞的选项。
默认情况下,为了向后兼容,该Jackson2JsonMessageConverter 信任所有包 — 也就是说,它用于选项。* |
从 2.4.7 版开始,转换器可以配置为Optional.empty()
如果Jackson回归null
反序列化消息正文后。
这有利于@RabbitListener
s 以两种方式接收空有效负载:
@RabbitListener(queues = "op.1")
void listen(@Payload(required = false) Thing payload) {
handleOptional(payload); // payload might be null
}
@RabbitListener(queues = "op.2")
void listen(Optional<Thing> optional) {
handleOptional(optional.orElse(this.emptyThing));
}
要启用此功能,请将setNullAsOptionalEmpty
自true
;什么时候false
(默认值),转换器回退到原始消息正文 (byte[]
).
@Bean
Jackson2JsonMessageConverter converter() {
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
converter.setNullAsOptionalEmpty(true);
return converter;
}
反序列化抽象类
在 2.2.8 版本之前,如果推断的类型@RabbitListener
是一个抽象类(包括接口),转换器将回退到在标头中查找类型信息,如果存在,则使用该信息;如果不存在,它将尝试创建抽象类。
当自定义ObjectMapper
使用配置了自定义反序列化程序来处理抽象类,但传入消息具有无效的类型标头。
从 2.2.8 版开始,默认情况下保留以前的行为。如果你有这样的习俗ObjectMapper
如果要忽略类型标头,并始终使用推断的类型进行转换,请将alwaysConvertToInferredType
自true
.
这是向后兼容性所必需的,并避免尝试转换失败时的开销(使用标准ObjectMapper
).
使用 Spring 数据投影接口
从版本 2.2 开始,您可以将 JSON 转换为 Spring Data Projection 接口而不是具体类型。 这允许对数据进行非常有选择性和低耦合的绑定,包括从 JSON 文档中的多个位置查找值。 例如,可以将以下接口定义为消息有效负载类型:
interface SomeSample {
@JsonPath({ "$.username", "$.user.name" })
String getUsername();
}
@RabbitListener(queues = "projection")
public void projection(SomeSample in) {
String username = in.getUsername();
...
}
默认情况下,访问器方法将用于将属性名称查找为接收的 JSON 文档中的字段。
这@JsonPath
expression 允许自定义值查找,甚至可以定义多个 JSON 路径表达式,从多个位置查找值,直到表达式返回实际值。
要启用此功能,请将useProjectionForInterfaces
自true
在消息转换器上。
您还必须添加spring-data:spring-data-commons
和com.jayway.jsonpath:json-path
到类路径。
当用作参数时@RabbitListener
方法,接口类型会正常自动传递给转换器。
从Message
跟RabbitTemplate
如前所述,类型信息在消息头中传达,以帮助转换器从消息转换。
这在大多数情况下效果很好。
但是,当使用泛型类型时,它只能转换简单对象和已知的“容器”对象(列表、数组和映射)。
从 2.0 版开始,Jackson2JsonMessageConverter
实现SmartMessageConverter
,这允许它与新的RabbitTemplate
采用ParameterizedTypeReference
论点。 这允许转换复杂的泛型类型,如以下示例所示:
Thing1<Thing2<Cat, Hat>> thing1 =
rabbitTemplate.receiveAndConvert(new ParameterizedTypeReference<Thing1<Thing2<Cat, Hat>>>() { });
从 2.1 版开始,AbstractJsonMessageConverter 类已被删除。
它不再是Jackson2JsonMessageConverter .
它已被AbstractJackson2MessageConverter . |
MarshallingMessageConverter
另一种选择是MarshallingMessageConverter
. 它委托给 Spring OXM 库的Marshaller
和Unmarshaller
策略接口。
您可以在此处阅读有关该库的更多信息。
在配置方面,最常见的是仅提供构造函数参数,因为大多数Marshaller
还实现Unmarshaller
. 以下示例演示如何配置MarshallingMessageConverter
:
<bean class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<property name="connectionFactory" ref="rabbitConnectionFactory"/>
<property name="messageConverter">
<bean class="org.springframework.amqp.support.converter.MarshallingMessageConverter">
<constructor-arg ref="someImplemenationOfMarshallerAndUnmarshaller"/>
</bean>
</property>
</bean>
Jackson2XmlMessageConverter
此类是在 2.1 版中引入的,可用于将消息从 XML 转换或转换为 XML。
双Jackson2XmlMessageConverter
和Jackson2JsonMessageConverter
具有相同的基类:AbstractJackson2MessageConverter
.
这AbstractJackson2MessageConverter 引入 class 来替换已删除的类:AbstractJsonMessageConverter . |
这Jackson2XmlMessageConverter
使用com.fasterxml.jackson
2.x 库。
你可以用同样的方式使用它Jackson2JsonMessageConverter
,只是它支持 XML 而不是 JSON。以下示例配置了Jackson2JsonMessageConverter
:
<bean id="xmlConverterWithDefaultType"
class="org.springframework.amqp.support.converter.Jackson2XmlMessageConverter">
<property name="classMapper">
<bean class="org.springframework.amqp.support.converter.DefaultClassMapper">
<property name="defaultType" value="foo.PurchaseOrder"/>
</bean>
</property>
</bean>
有关详细信息,请参阅 Jackson2JsonMessageConverter。
从 2.2 版本开始,application/xml 如果没有contentType 属性,或者它具有默认值application/octet-stream .
要恢复到以前的行为(返回未转换的byte[] ),将转换器的assumeSupportedContentType 属性设置为false . |
ContentTypeDelegatingMessageConverter
此类是在版本 1.4.2 中引入的,允许委托给特定的MessageConverter
基于MessageProperties
. 默认情况下,它委托给SimpleMessageConverter
如果没有contentType
属性,或者存在与配置的任何转换器都不匹配的值。以下示例配置了ContentTypeDelegatingMessageConverter
:
<bean id="contentTypeConverter" class="ContentTypeDelegatingMessageConverter">
<property name="delegates">
<map>
<entry key="application/json" value-ref="jsonMessageConverter" />
<entry key="application/xml" value-ref="xmlMessageConverter" />
</map>
</property>
</bean>
Java 反序列化
本节介绍如何反序列化 Java 对象。
从不受信任的来源反序列化 Java 对象时可能存在漏洞。 如果您接受来自不受信任来源的邮件,则使用 默认情况下,允许的列表为空,这意味着不会反序列化任何类。 您可以设置模式列表,例如 将按顺序检查模式,直到找到匹配项。
如果没有匹配项,则 您可以使用 |
消息属性转换器
这MessagePropertiesConverter
策略接口用于在 Rabbit 客户端之间进行转换BasicProperties
和 Spring AMQPMessageProperties
.
默认实现 (DefaultMessagePropertiesConverter
)通常足以满足大多数目的,但如果需要,您可以实现自己的。
默认属性转换器将BasicProperties
类型元素LongString
自String
当大小不大于1024
字节。
较大LongString
实例不会转换(请参阅下一段)。
可以使用构造函数参数覆盖此限制。
从 1.6 版开始,长于长字符串限制(默认:1024)的标头现在保留为LongString
实例默认情况下由DefaultMessagePropertiesConverter
.
您可以通过getBytes[]
,toString()
或getStream()
方法。
以前,DefaultMessagePropertiesConverter
将此类标头“转换为DataInputStream
(实际上它只是引用了LongString
实例的DataInputStream
).
在输出时,此标头未被转换(除了转换为 String — 例如java.io.DataInputStream@1d057a39
通过调用toString()
在直播中)。
大额进货LongString
标头现在也会在输出时正确“转换”(默认)。
提供了一个新的构造函数,可让您将转换器配置为像以前一样工作。 以下列表显示了该方法的 Javadoc 注释和声明:
/**
* Construct an instance where LongStrings will be returned
* unconverted or as a java.io.DataInputStream when longer than this limit.
* Use this constructor with 'true' to restore pre-1.6 behavior.
* @param longStringLimit the limit.
* @param convertLongLongStrings LongString when false,
* DataInputStream when true.
* @since 1.6
*/
public DefaultMessagePropertiesConverter(int longStringLimit, boolean convertLongLongStrings) { ... }
同样从版本 1.6 开始,一个名为correlationIdString
已添加到MessageProperties
.
以前,在与BasicProperties
RabbitMQ 客户端使用的,不必要的byte[] <→ String
执行转换是因为MessageProperties.correlationId
是一个byte[]
但BasicProperties
使用String
.
(最终,RabbitMQ 客户端使用 UTF-8 将String
到字节以放入协议消息中)。
为了提供最大的向后兼容性,名为correlationIdPolicy
已添加到DefaultMessagePropertiesConverter
.
这需要一个DefaultMessagePropertiesConverter.CorrelationIdPolicy
enum 参数。
默认情况下,它设置为BYTES
,它复制了以前的行为。
对于入站邮件:
-
STRING
:只有correlationIdString
属性映射 -
BYTES
:只有correlationId
属性映射 -
BOTH
:映射了两个属性
对于出站邮件:
-
STRING
:只有correlationIdString
属性映射 -
BYTES
:只有correlationId
属性映射 -
BOTH
:考虑这两个属性,使用String
属性优先
同样从版本 1.6 开始,入站deliveryMode
属性不再映射到MessageProperties.deliveryMode
. 它映射到MessageProperties.receivedDeliveryMode
相反。 此外,入站userId
属性不再映射到MessageProperties.userId
. 它映射到MessageProperties.receivedUserId
相反。 这些更改是为了避免这些属性在相同时意外传播MessageProperties
对象用于出站消息。
从 2.2 版开始,DefaultMessagePropertiesConverter
转换任何具有类型Class<?>
用getName()
而不是toString()
; 这避免了使用应用程序必须从toString()
表示法。 对于滚动升级,您可能需要更改使用者以了解这两种格式,直到所有生产者都升级完毕。
4.1.9. 修改消息 - 压缩等
存在许多扩展点。它们允许您在消息发送到 RabbitMQ 之前或在收到消息后立即对消息执行一些处理。
正如在消息转换器中看到的那样,一个这样的扩展点位于AmqpTemplate
convertAndReceive
operations,您可以在其中提供MessagePostProcessor
.
例如,在转换 POJO 后,MessagePostProcessor
允许您在Message
.
从 1.4.2 版开始,在RabbitTemplate
- setBeforePublishPostProcessors()
和setAfterReceivePostProcessors()
. 第一个使后处理器能够在发送到 RabbitMQ 之前立即运行。使用批处理(请参阅批处理)时,在组装批处理之后和发送批处理之前调用此作。第二个在收到消息后立即调用。
这些扩展点用于压缩等功能,为此,多个MessagePostProcessor
提供了实现。GZipPostProcessor
,ZipPostProcessor
和DeflaterPostProcessor
在发送前压缩邮件,以及GUnzipPostProcessor
,UnzipPostProcessor
和InflaterPostProcessor
解压缩收到的消息。
从 2.1.5 版本开始,GZipPostProcessor 可以使用copyProperties = true 选项以复制原始邮件属性。
默认情况下,出于性能原因,这些属性会重复使用,并使用压缩内容编码和可选的MessageProperties.SPRING_AUTO_DECOMPRESS 页眉。
如果保留对原始出站消息的引用,则其属性也会更改。
因此,如果您的应用程序保留了带有这些消息后处理器的出站消息的副本,请考虑将copyProperties 选项。 |
从版本 2.2.12 开始,您可以配置压缩后处理器在内容编码元素之间使用的分隔符。
在 2.2.11 及更早版本中,这被硬编码为: ,现在设置为, ` by default.
The decompressors will work with both delimiters.
However, if you publish messages with 2.3 or later and consume with 2.2.11 or earlier, you MUST set the `encodingDelimiter 压缩器上的属性设置为: .
当您的消费者升级到 2.2.11 或更高版本时,您可以恢复为默认值 ', '。 |
同样,SimpleMessageListenerContainer
还有一个setAfterReceivePostProcessors()
方法,让容器收到消息后执行解压缩。
从 2.1.4 版本开始,addBeforePublishPostProcessors()
和addAfterReceivePostProcessors()
已添加到RabbitTemplate
以允许将新的后处理器分别附加到 Before Publish 和 After Receive 后处理器的列表中。
此外,还提供了一些方法来删除后处理器。
同样地AbstractMessageListenerContainer
还有addAfterReceivePostProcessors()
和removeAfterReceivePostProcessor()
添加了方法。
请参阅 Java 文档RabbitTemplate
和AbstractMessageListenerContainer
了解更多详情。
4.1.10. 请求/回复消息传递
这AmqpTemplate
还提供了多种sendAndReceive
接受前面描述的单向发送作 (exchange
,routingKey
和Message
).
这些方法对于请求-回复方案非常有用,因为它们处理了必要reply-to
属性,并且可以在内部为此目的创建的独占队列上侦听回复消息。
类似的请求-回复方法也可用,其中MessageConverter
应用于请求和回复。
这些方法被命名为convertSendAndReceive
.
请参阅Javadoc 的AmqpTemplate
了解更多详情。
从 1.5.0 版本开始,每个sendAndReceive
方法 variants 有一个重载版本,它采用CorrelationData
.
与正确配置的连接工厂一起,这可以接收作的发送端的发布者确认。
请参阅相关发布者确认和退货和Javadoc 的RabbitOperations
了解更多信息。
从 2.0 版开始,这些方法有变体 (convertSendAndReceiveAsType
) 需要额外的ParameterizedTypeReference
参数来转换复杂的返回类型。
模板必须配置SmartMessageConverter
.
看从Message
跟RabbitTemplate
了解更多信息。
从 2.1 版开始,您可以配置RabbitTemplate
使用noLocalReplyConsumer
控制noLocal
标记回复消费者。
这是false
默认情况下。
回复超时
默认情况下,发送和接收方法在五秒后超时并返回 null。
您可以通过将replyTimeout
财产。
从 1.5 版开始,如果您将mandatory
属性设置为true
(或mandatory-expression
评估为true
对于特定消息),如果消息无法传递到队列,则AmqpMessageReturnedException
被抛出。
此异常具有returnedMessage
,replyCode
和replyText
属性,以及exchange
和routingKey
用于发送。
此功能使用发布者退货。
您可以通过设置publisherReturns 自true 在CachingConnectionFactory (请参阅出版商确认和退货)。
此外,您不得注册自己的ReturnCallback 使用RabbitTemplate . |
从 2.1.2 版开始,一个replyTimedOut
方法,让子类被告知超时,以便它们可以清理任何保留的状态。
从版本 2.0.11 和 2.1.3 开始,当您使用默认的DirectReplyToMessageListenerContainer
,您可以通过设置模板的replyErrorHandler
财产。 对于任何失败的传递,例如延迟回复和收到没有相关标头的消息,都会调用此错误处理程序。传入的异常是ListenerExecutionFailedException
,它有一个failedMessage
财产。
RabbitMQ 直接回复
从版本 3.4.0 开始,RabbitMQ 服务器支持直接回复。这消除了固定回复队列的主要原因(以避免需要为每个请求创建临时队列)。从 Spring AMQP 版本 1.4.1 开始,默认使用直接回复(如果服务器支持),而不是创建临时回复队列。当没有replyQueue (或设置为amq.rabbitmq.reply-to )、RabbitTemplate 自动检测是否支持直接回复,并使用它或回退到使用临时回复队列。使用直接回复时,reply-listener 不是必需的,也不应配置。 |
命名队列仍支持应答侦听器(除了amq.rabbitmq.reply-to
),允许控制回复并发等。
从 1.6 版开始,如果您希望对每个
reply,将useTemporaryReplyQueues
属性设置为true
.
如果将replyAddress
.
您可以通过子类化来更改指示是否使用直接回复的条件RabbitTemplate
和覆盖useDirectReplyTo()
以检查不同的标准。
发送第一个请求时,该方法仅被调用一次。
在 2.0 版之前,RabbitTemplate
为每个请求创建一个新的使用者,并在收到回复(或超时)时取消使用者。
现在,模板使用DirectReplyToMessageListenerContainer
相反,让消费者被重复使用。
模板仍然负责关联回复,因此不存在延迟回复给其他发件人的危险。
如果要恢复到以前的行为,请将useDirectReplyToContainer
(direct-reply-to-container
使用 XML 配置时)属性设置为 false。
这AsyncRabbitTemplate
没有这样的选择。
它总是使用DirectReplyToContainer
用于使用直接回复时的回复。
从版本 2.3.7 开始,模板有一个新属性useChannelForCorrelation
.
当这是true
,则服务器不必将相关 ID 从请求消息头复制到回复消息。
相反,用于发送请求的通道用于将回复与请求相关联。
消息与应答队列的关联
使用固定应答队列时(amq.rabbitmq.reply-to
),您必须提供相关数据,以便回复可以与请求相关联。
请参阅 RabbitMQ 远程过程调用 (RPC)。
默认情况下,标准correlationId
属性用于保存相关数据。但是,如果您希望使用自定义属性来保存相关数据,则可以将correlation-key
><属性。显式将属性设置为correlationId
与省略属性相同。客户端和服务器必须对关联数据使用相同的标头。
Spring AMQP 版本 1.1 使用了一个名为spring_reply_correlation 。如果您希望在当前版本中恢复到此行为(可能是为了保持与使用 1.1 的另一个应用程序的兼容性),则必须将属性设置为spring_reply_correlation . |
默认情况下,模板会生成自己的相关 ID(忽略任何用户提供的值)。
如果您希望使用自己的相关 ID,请将RabbitTemplate
实例的userCorrelationId
属性设置为true
.
关联标识必须是唯一的,以避免为请求返回错误的回复。 |
回复侦听器容器
当使用 3.4.0 之前的 RabbitMQ 版本时,每个回复都会使用一个新的临时队列。但是,可以在模板上配置单个回复队列,这可以更高效,并且还允许您在该队列上设置参数。但是,在这种情况下,您还必须提供 <reply-listener/> 子元素。此元素为回复队列提供侦听器容器,模板是侦听器。该元素允许在 <listener-container/> 上允许的所有消息侦听器容器配置属性,但connection-factory
和message-converter
,这些都是从模板的配置继承而来的。
如果您运行应用程序的多个实例或使用RabbitTemplate 实例,您必须为每个实例使用唯一的回复队列。
RabbitMQ 无法从队列中选择消息,因此,如果它们都使用相同的队列,则每个实例都会争夺回复,而不一定收到自己的回复。 |
以下示例定义了具有连接工厂的兔子模板:
<rabbit:template id="amqpTemplate"
connection-factory="connectionFactory"
reply-queue="replies"
reply-address="replyEx/routeReply">
<rabbit:reply-listener/>
</rabbit:template>
虽然容器和模板共享连接工厂,但它们不共享通道。 因此,请求和回复不会在同一事务中执行(如果是事务)。
在 1.5.0 版本之前,reply-address 属性不可用。
回复始终使用默认交换和reply-queue name 作为路由键。
这仍然是默认设置,但您现在可以指定新的reply-address 属性。
这reply-address 可以包含具有<exchange>/<routingKey> 并且应答将路由到指定的交换,并路由到与路由键绑定的队列。
这reply-address 优先于reply-queue .
仅当reply-address 正在使用中,则<reply-listener> 必须配置为单独的<listener-container> 元件。
这reply-address 和reply-queue (或queues 属性<listener-container> )必须在逻辑上引用同一个队列。 |
使用此配置,一个SimpleListenerContainer
用于接收回复,其中RabbitTemplate
作为MessageListener
.
使用<rabbit:template/>
namespace 元素,如前面的示例所示,解析器将模板中的容器和线定义为监听器。
当模板不使用固定的replyQueue (或使用直接回复 - 参见 RabbitMQ 直接回复),则不需要侦听器容器。
直接reply-to 是使用 RabbitMQ 3.4.0 或更高版本时的首选机制。 |
如果您定义了RabbitTemplate
作为<bean/>
或使用@Configuration
类将其定义为@Bean
或者,以编程方式创建模板时,需要自行定义和连接回复侦听器容器。
如果不执行此作,模板将永远不会收到回复,最终会超时并返回 null 作为对sendAndReceive
方法。
从 1.5 版开始,RabbitTemplate
检测它是否已经
配置为MessageListener
接收回复。
如果没有,则尝试发送和接收带有回复地址的邮件
fail 时IllegalStateException
(因为永远不会收到回复)。
此外,如果简单的replyAddress
(队列名称),则应答侦听器容器验证它是否正在侦听到具有相同名称的队列。如果应答地址是交换和路由密钥并写入了调试日志消息,则无法执行此检查。
在自己连接回复侦听器和模板时,重要的是要确保模板的replyAddress 和容器的queues (或queueNames ) 属性引用同一队列。模板将回复地址插入到出站消息中replyTo 财产。 |
以下列表显示了如何手动连接 Bean 的示例:
<bean id="amqpTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<constructor-arg ref="connectionFactory" />
<property name="exchange" value="foo.exchange" />
<property name="routingKey" value="foo" />
<property name="replyQueue" ref="replyQ" />
<property name="replyTimeout" value="600000" />
<property name="useDirectReplyToContainer" value="false" />
</bean>
<bean class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<constructor-arg ref="connectionFactory" />
<property name="queues" ref="replyQ" />
<property name="messageListener" ref="amqpTemplate" />
</bean>
<rabbit:queue id="replyQ" name="my.reply.queue" />
@Bean
public RabbitTemplate amqpTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMessageConverter(msgConv());
rabbitTemplate.setReplyAddress(replyQueue().getName());
rabbitTemplate.setReplyTimeout(60000);
rabbitTemplate.setUseDirectReplyToContainer(false);
return rabbitTemplate;
}
@Bean
public SimpleMessageListenerContainer replyListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueues(replyQueue());
container.setMessageListener(amqpTemplate());
return container;
}
@Bean
public Queue replyQueue() {
return new Queue("my.reply.queue");
}
一个完整的示例RabbitTemplate
与固定的应答队列连接,以及处理请求并返回应答的“远程”侦听器容器,在此测试用例中显示。
当回复超时 (replyTimeout )、sendAndReceive() 方法返回 null。 |
在 1.3.6 版之前,仅记录超时消息的延迟回复。
现在,如果收到延迟回复,则会拒绝它(模板会抛出AmqpRejectAndDontRequeueException
).
如果回复队列配置为将被拒绝的邮件发送到死信交换,则可以检索回复以供以后分析。
为此,请使用与回复队列名称相等的路由密钥将队列绑定到配置的死信交换。
有关配置死信的更多信息,请参阅 RabbitMQ 死信文档。
您还可以查看FixedReplyQueueDeadLetterTests
测试用例作为示例。
异步兔子模板
1.6 版引入了AsyncRabbitTemplate
.
这有类似的sendAndReceive
(和convertSendAndReceive
) 方法添加到AmqpTemplate
.
但是,它们不是阻塞,而是返回CompletableFuture
.
这sendAndReceive
方法返回一个RabbitMessageFuture
.
这convertSendAndReceive
方法返回一个RabbitConverterFuture
.
您可以稍后通过调用get()
,或者您可以注册一个与结果异步调用的回调。
以下列表显示了这两种方法:
@Autowired
private AsyncRabbitTemplate template;
...
public void doSomeWorkAndGetResultLater() {
...
CompletableFuture<String> future = this.template.convertSendAndReceive("foo");
// do some more work
String reply = null;
try {
reply = future.get(10, TimeUnit.SECONDS);
}
catch (ExecutionException e) {
...
}
...
}
public void doSomeWorkAndGetResultAsync() {
...
RabbitConverterFuture<String> future = this.template.convertSendAndReceive("foo");
future.whenComplete((result, ex) -> {
if (ex == null) {
// success
}
else {
// failure
}
});
...
}
如果mandatory
设置了消息,并且无法传递消息,则 future 会抛出一个ExecutionException
原因为AmqpMessageReturnedException
,它封装了返回的消息和有关返回的信息。
如果enableConfirms
设置时,未来有一个名为confirm
,它本身就是一个CompletableFuture<Boolean>
跟true
表示发布成功。
如果确认未来是false
这RabbitFuture
还有一个名为nackCause
,其中包含失败的原因(如果可用)。
如果在回复后收到发布者确认,则该确认将被丢弃,因为该回复意味着成功发布。 |
您可以将receiveTimeout
属性以超时回复(默认为30000
- 30 秒)。
如果发生超时,则未来将以AmqpReplyTimeoutException
.
模板实现SmartLifecycle
.
在有待处理的回复时停止模板会导致Future
要取消的实例。
从 2.0 版开始,异步模板现在支持直接回复,而不是配置的回复队列。 若要启用此功能,请使用以下构造函数之一:
public AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange, String routingKey)
public AsyncRabbitTemplate(RabbitTemplate template)
请参阅 RabbitMQ 直接回复,将直接回复与同步RabbitTemplate
.
2.0 版引入了这些方法的变体 (convertSendAndReceiveAsType
) 需要额外的ParameterizedTypeReference
参数来转换复杂的返回类型。
您必须配置基础RabbitTemplate
使用SmartMessageConverter
.
看从Message
跟RabbitTemplate
了解更多信息。
从 3.0 版开始,AsyncRabbitTemplate 方法现在返回CompletableFuture s 而不是ListenableFuture s. |
4.1.11. 配置代理
AMQP 规范描述了如何使用该协议在代理上配置队列、交换和绑定。
这些作(从 0.8 规范及更高版本开始可移植)存在于AmqpAdmin
接口中的org.springframework.amqp.core
包。
该类的 RabbitMQ 实现是RabbitAdmin
位于org.springframework.amqp.rabbit.core
包。
这AmqpAdmin
接口基于使用 Spring AMQP 域抽象,如以下列表所示:
public interface AmqpAdmin {
// Exchange Operations
void declareExchange(Exchange exchange);
void deleteExchange(String exchangeName);
// Queue Operations
Queue declareQueue();
String declareQueue(Queue queue);
void deleteQueue(String queueName);
void deleteQueue(String queueName, boolean unused, boolean empty);
void purgeQueue(String queueName, boolean noWait);
// Binding Operations
void declareBinding(Binding binding);
void removeBinding(Binding binding);
Properties getQueueProperties(String queueName);
}
另请参阅作用域作。
这getQueueProperties()
方法返回有关队列的一些有限信息(消息计数和使用者计数)。
返回的属性的键可作为RabbitTemplate
(QUEUE_NAME
,QUEUE_MESSAGE_COUNT
和QUEUE_CONSUMER_COUNT
).
RabbitMQ REST API 在QueueInfo
对象。
无参数declareQueue()
方法在代理上定义一个队列,其名称为自动生成。
此自动生成队列的其他属性包括exclusive=true
,autoDelete=true
和durable=false
.
这declareQueue(Queue queue)
方法采用Queue
对象,并返回声明队列的名称。
如果name
提供的属性Queue
是一个空的String
,代理使用生成的名称声明队列。
该名称将返回给调用方。
该名称也会添加到actualName
属性的Queue
.
只能通过调用RabbitAdmin
径直。
在应用程序上下文中以声明方式定义队列时,管理员使用自动声明时,可以将 name 属性设置为(空字符串)。
然后,代理创建名称。
从 2.1 版开始,侦听器容器可以使用这种类型的队列。
有关更多信息,请参阅容器和代理命名队列。""
这与AnonymousQueue
其中框架生成一个唯一的 (UUID
) 名称和集合durable
自false
和exclusive
,autoDelete
自true
.
一个<rabbit:queue/>
带有空(或缺失)name
属性总是创建一个AnonymousQueue
.
看AnonymousQueue
了解原因AnonymousQueue
优于代理生成的队列名称以及
如何控制名称的格式。
从版本 2.1 开始,匿名队列使用 argumentQueue.X_QUEUE_LEADER_LOCATOR
设置为client-local
默认情况下。
这可确保在应用程序连接到的节点上声明队列。
声明性队列必须具有固定名称,因为它们可能会在上下文中的其他地方被引用,例如在
侦听器,如以下示例所示:
<rabbit:listener-container>
<rabbit:listener ref="listener" queue-names="#{someQueue.name}" />
</rabbit:listener-container>
请参阅交换、队列和绑定的自动声明。
这个接口的RabbitMQ实现是RabbitAdmin
,当使用 Spring XML 进行配置时,类似于以下示例:
<rabbit:connection-factory id="connectionFactory"/>
<rabbit:admin id="amqpAdmin" connection-factory="connectionFactory"/>
当CachingConnectionFactory
缓存模式为CHANNEL
(默认值)、RabbitAdmin
实现自动延迟声明在同一个中声明的队列、交换和绑定ApplicationContext
.
这些组件在Connection
向经纪人开放。
有一些命名空间功能使这变得非常方便——例如,
在 Stocks 示例应用程序中,我们有以下内容:
<rabbit:queue id="tradeQueue"/>
<rabbit:queue id="marketDataQueue"/>
<fanout-exchange name="broadcast.responses"
xmlns="http://www.springframework.org/schema/rabbit">
<bindings>
<binding queue="tradeQueue"/>
</bindings>
</fanout-exchange>
<topic-exchange name="app.stock.marketdata"
xmlns="http://www.springframework.org/schema/rabbit">
<bindings>
<binding queue="marketDataQueue" pattern="${stocks.quote.pattern}"/>
</bindings>
</topic-exchange>
在前面的示例中,我们使用匿名队列(实际上,在内部,只是名称由框架而不是代理生成的队列)并通过 ID 引用它们。 我们还可以使用显式名称声明队列,这些队列也用作上下文中其 bean 定义的标识符。 以下示例配置具有显式名称的队列:
<rabbit:queue name="stocks.trade.queue"/>
您可以同时提供id 和name 属性。 这允许您通过独立于队列名称的 ID 来引用队列(例如,在绑定中)。它还允许标准 Spring 功能(例如队列名称的属性占位符和 SpEL 表达式)。当您使用名称作为 bean 标识符时,这些功能不可用。 |
可以使用其他参数配置队列 — 例如x-message-ttl
. 当您使用命名空间支持时,它们以Map
的参数名称/参数值对,这些对是通过使用<rabbit:queue-arguments>
元素。 以下示例显示了如何执行此作:
<rabbit:queue name="withArguments">
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange" value="myDLX"/>
<entry key="x-dead-letter-routing-key" value="dlqRK"/>
</rabbit:queue-arguments>
</rabbit:queue>
默认情况下,参数假定为字符串。 对于其他类型的参数,必须提供类型。 以下示例演示如何指定类型:
<rabbit:queue name="withArguments">
<rabbit:queue-arguments value-type="java.lang.Long">
<entry key="x-message-ttl" value="100"/>
</rabbit:queue-arguments>
</rabbit:queue>
提供混合类型的参数时,必须为每个条目元素提供类型。 以下示例显示了如何执行此作:
<rabbit:queue name="withArguments">
<rabbit:queue-arguments>
<entry key="x-message-ttl">
<value type="java.lang.Long">100</value>
</entry>
<entry key="x-dead-letter-exchange" value="myDLX"/>
<entry key="x-dead-letter-routing-key" value="dlqRK"/>
</rabbit:queue-arguments>
</rabbit:queue>
在 Spring Framework 3.2 及更高版本中,可以更简洁地声明这一点,如下所示:
<rabbit:queue name="withArguments">
<rabbit:queue-arguments>
<entry key="x-message-ttl" value="100" value-type="java.lang.Long"/>
<entry key="x-ha-policy" value="all"/>
</rabbit:queue-arguments>
</rabbit:queue>
当您使用 Java 配置时,Queue.X_QUEUE_LEADER_LOCATOR
参数通过setLeaderLocator()
方法Queue
类。
从版本 2.1 开始,匿名队列的声明将此属性设置为client-local
默认情况下。
这可确保在应用程序连接到的节点上声明队列。
RabbitMQ 代理不允许声明参数不匹配的队列。
例如,如果queue 已经存在,没有time to live 参数,并且您尝试使用(例如)key="x-message-ttl" value="100" ,则引发异常。 |
默认情况下,RabbitAdmin
发生任何异常时立即停止处理所有声明。
这可能会导致下游问题,例如侦听器容器无法初始化,因为未声明另一个队列(在错误队列之后定义)。
可以通过设置ignore-declaration-exceptions
属性设置为true
在RabbitAdmin
实例。
此选项指示RabbitAdmin
以记录异常并继续声明其他元素。
配置RabbitAdmin
使用 Java,此属性称为ignoreDeclarationExceptions
.
这是适用于所有元素的全局设置。
队列、交换和绑定具有仅适用于这些元素的类似属性。
在 1.6 版之前,此属性仅在IOException
发生在通道上,例如当前属性与所需属性不匹配时。
现在,此属性对任何异常生效,包括TimeoutException
和其他人。
此外,任何声明异常都会导致发布DeclarationExceptionEvent
,这是一个ApplicationEvent
可以被任何ApplicationListener
在上下文中。
该事件包含对管理员的引用、正在声明的元素以及Throwable
.
标头交换
从 1.3 版开始,您可以配置HeadersExchange
在多个标头上匹配。
您还可以指定是否必须匹配任何或所有标头。
以下示例显示了如何执行此作:
<rabbit:headers-exchange name="headers-test">
<rabbit:bindings>
<rabbit:binding queue="bucket">
<rabbit:binding-arguments>
<entry key="foo" value="bar"/>
<entry key="baz" value="qux"/>
<entry key="x-match" value="all"/>
</rabbit:binding-arguments>
</rabbit:binding>
</rabbit:bindings>
</rabbit:headers-exchange>
从 1.6 版开始,您可以配置Exchanges
使用internal
标志(默认为false
)和这样的Exchange
通过RabbitAdmin
(如果应用程序上下文中存在)。
如果internal
flag 是true
对于交换,RabbitMQ 不允许客户端使用该交换。
这对于死信交换或交换到交换绑定很有用,您不希望使用交换
直接由出版商提供。
要了解如何使用 Java 配置 AMQP 基础结构,请查看 Stock 示例应用程序
如果有@Configuration
类AbstractStockRabbitConfiguration
,这反过来又有RabbitClientConfiguration
和RabbitServerConfiguration
子。
以下列表显示了AbstractStockRabbitConfiguration
:
@Configuration
public abstract class AbstractStockAppRabbitConfiguration {
@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setMessageConverter(jsonMessageConverter());
configureRabbitTemplate(template);
return template;
}
@Bean
public Jackson2JsonMessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public TopicExchange marketDataExchange() {
return new TopicExchange("app.stock.marketdata");
}
// additional code omitted for brevity
}
在 Stock 应用程序中,服务器是使用以下命令配置的@Configuration
类:
@Configuration
public class RabbitServerConfiguration extends AbstractStockAppRabbitConfiguration {
@Bean
public Queue stockRequestQueue() {
return new Queue("app.stock.request");
}
}
这是整个传承链的终结@Configuration
类。
最终结果是TopicExchange
和Queue
在应用程序启动时向代理声明。
没有TopicExchange
到服务器配置中的队列,就像在客户端应用程序中完成的那样。
但是,股票请求队列会自动绑定到 AMQP 默认交易所。
此行为由规范定义。
客户端@Configuration
班级更有趣一点。
其声明如下:
@Configuration
public class RabbitClientConfiguration extends AbstractStockAppRabbitConfiguration {
@Value("${stocks.quote.pattern}")
private String marketDataRoutingKey;
@Bean
public Queue marketDataQueue() {
return amqpAdmin().declareQueue();
}
/**
* Binds to the market data exchange.
* Interested in any stock quotes
* that match its routing key.
*/
@Bean
public Binding marketDataBinding() {
return BindingBuilder.bind(
marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
}
// additional code omitted for brevity
}
客户端通过declareQueue()
方法AmqpAdmin
.
它使用属性文件中外部化的路由模式将该队列绑定到市场数据交换。
用于队列和交换的构建器 API
1.6 版引入了一个方便的流畅 API 来配置Queue
和Exchange
使用 Java 配置时的对象。
以下示例演示如何使用它:
@Bean
public Queue queue() {
return QueueBuilder.nonDurable("foo")
.autoDelete()
.exclusive()
.withArgument("foo", "bar")
.build();
}
@Bean
public Exchange exchange() {
return ExchangeBuilder.directExchange("foo")
.autoDelete()
.internal()
.withArgument("foo", "bar")
.build();
}
请参阅 Javadocorg.springframework.amqp.core.QueueBuilder
和org.springframework.amqp.core.ExchangeBuilder
了解更多信息。
从 2.0 版开始,ExchangeBuilder
现在默认创建持久交换,以与单个AbstractExchange
类。
要与构建器进行非持久交换,请使用.durable(false)
在调用之前.build()
.
这durable()
不再提供没有参数的方法。
2.2 版引入了流畅的 API 来添加“众所周知”的交换和队列参数......
@Bean
public Queue allArgs1() {
return QueueBuilder.nonDurable("all.args.1")
.ttl(1000)
.expires(200_000)
.maxLength(42)
.maxLengthBytes(10_000)
.overflow(Overflow.rejectPublish)
.deadLetterExchange("dlx")
.deadLetterRoutingKey("dlrk")
.maxPriority(4)
.lazy()
.leaderLocator(LeaderLocator.minLeaders)
.singleActiveConsumer()
.build();
}
@Bean
public DirectExchange ex() {
return ExchangeBuilder.directExchange("ex.with.alternate")
.durable(true)
.alternate("alternate")
.build();
}
声明交换、队列和绑定的集合
您可以包装Declarable
对象 (Queue
,Exchange
和Binding
) 在Declarables
对象。
这RabbitAdmin
检测此类 bean(以及离散的Declarable
beans),并在建立连接时(最初和连接失败后)在代理上声明包含的对象。
以下示例显示了如何执行此作:
@Configuration
public static class Config {
@Bean
public CachingConnectionFactory cf() {
return new CachingConnectionFactory("localhost");
}
@Bean
public RabbitAdmin admin(ConnectionFactory cf) {
return new RabbitAdmin(cf);
}
@Bean
public DirectExchange e1() {
return new DirectExchange("e1", false, true);
}
@Bean
public Queue q1() {
return new Queue("q1", false, false, true);
}
@Bean
public Binding b1() {
return BindingBuilder.bind(q1()).to(e1()).with("k1");
}
@Bean
public Declarables es() {
return new Declarables(
new DirectExchange("e2", false, true),
new DirectExchange("e3", false, true));
}
@Bean
public Declarables qs() {
return new Declarables(
new Queue("q2", false, false, true),
new Queue("q3", false, false, true));
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public Declarables prototypes() {
return new Declarables(new Queue(this.prototypeQueueName, false, false, true));
}
@Bean
public Declarables bs() {
return new Declarables(
new Binding("q2", DestinationType.QUEUE, "e2", "k2", null),
new Binding("q3", DestinationType.QUEUE, "e3", "k3", null));
}
@Bean
public Declarables ds() {
return new Declarables(
new DirectExchange("e4", false, true),
new Queue("q4", false, false, true),
new Binding("q4", DestinationType.QUEUE, "e4", "k4", null));
}
}
在 2.1 之前的版本中,您可以声明多个Declarable 实例,通过定义类型为Collection<Declarable> .
在某些情况下,这可能会导致不良副作用,因为管理员必须遍历所有Collection<?> 豆。 |
2.2 版添加了getDeclarablesByType
method 设置为Declarables
;这可以作为方便使用,例如,在声明侦听器容器 bean 时。
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
Declarables mixedDeclarables, MessageListener listener) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(mixedDeclarables.getDeclarablesByType(Queue.class).toArray(new Queue[0]));
container.setMessageListener(listener);
return container;
}
有条件声明
默认情况下,所有队列、交换和绑定都由RabbitAdmin
实例(假设它们有auto-startup="true"
) 在应用程序上下文中。
从 2.1.9 版本开始,RabbitAdmin
有一个新属性explicitDeclarationsOnly
(即false
默认情况下);当将其设置为true
,管理员将仅声明显式配置为由该管理员声明的 Bean。
从 1.2 版本开始,您可以有条件地声明这些元素。 当应用程序连接到多个代理并需要指定应使用哪些代理声明特定元素时,这特别有用。 |
表示这些元素的类实现Declarable
,它有两种方法:shouldDeclare()
和getDeclaringAdmins()
.
这RabbitAdmin
使用这些方法来确定特定实例是否应该实际处理其Connection
.
这些属性在命名空间中作为属性提供,如以下示例所示:
<rabbit:admin id="admin1" connection-factory="CF1" />
<rabbit:admin id="admin2" connection-factory="CF2" />
<rabbit:admin id="admin3" connection-factory="CF3" explicit-declarations-only="true" />
<rabbit:queue id="declaredByAdmin1AndAdmin2Implicitly" />
<rabbit:queue id="declaredByAdmin1AndAdmin2" declared-by="admin1, admin2" />
<rabbit:queue id="declaredByAdmin1Only" declared-by="admin1" />
<rabbit:queue id="notDeclaredByAllExceptAdmin3" auto-declare="false" />
<rabbit:direct-exchange name="direct" declared-by="admin1, admin2">
<rabbit:bindings>
<rabbit:binding key="foo" queue="bar"/>
</rabbit:bindings>
</rabbit:direct-exchange>
默认情况下,auto-declare 属性为true 并且,如果declared-by 未提供(或为空),则所有RabbitAdmin 实例声明对象(只要管理员的auto-startup 属性为true 、默认值和管理员的explicit-declarations-only 属性为 false)。 |
同样,您可以使用基于 Java 的@Configuration
以达到相同的效果。
在以下示例中,组件由admin1
但不是admin2
:
@Bean
public RabbitAdmin admin1() {
return new RabbitAdmin(cf1());
}
@Bean
public RabbitAdmin admin2() {
return new RabbitAdmin(cf2());
}
@Bean
public Queue queue() {
Queue queue = new Queue("foo");
queue.setAdminsThatShouldDeclare(admin1());
return queue;
}
@Bean
public Exchange exchange() {
DirectExchange exchange = new DirectExchange("bar");
exchange.setAdminsThatShouldDeclare(admin1());
return exchange;
}
@Bean
public Binding binding() {
Binding binding = new Binding("foo", DestinationType.QUEUE, exchange().getName(), "foo", null);
binding.setAdminsThatShouldDeclare(admin1());
return binding;
}
关于id
和name
属性
这name
属性<rabbit:queue/>
和<rabbit:exchange/>
元素反映代理中实体的名称。
对于队列,如果name
被省略,则创建一个匿名队列(请参阅AnonymousQueue
).
在 2.0 之前的版本中,name
也注册为 bean 名称别名(类似于name
上<bean/>
元素)。
这导致了两个问题:
-
它阻止了队列的声明和同名交换。
-
如果别名包含 SpEL 表达式 (
#{…}
).
从 2.0 版开始,如果您声明其中一个元素同时使用id
和name
属性,则该名称不再声明为 Bean 名称别名。
如果您希望声明队列并与相同的队列进行交换name
,您必须提供id
.
如果元素只有name
属性。
bean 仍然可以被name
——例如,在具有约束力的声明中。
但是,如果名称包含 SpEL,您仍然无法引用它 — 您必须提供id
仅供参考。
AnonymousQueue
通常,当您需要一个唯一命名的独占自动删除队列时,我们建议您使用AnonymousQueue
而不是代理定义的队列名称(用作""
Queue
name 导致代理生成队列
名称)。
这是因为:
-
队列实际上是在建立与代理的连接时声明的。 这是在创建 bean 并连接在一起很久之后。 使用队列的 Bean 需要知道它的名称。 事实上,在应用程序启动时,代理甚至可能没有运行。
-
如果由于某种原因与代理的连接丢失,管理员会重新声明
AnonymousQueue
同名。 如果我们使用代理声明的队列,队列名称将发生变化。
您可以控制AnonymousQueue
实例。
默认情况下,队列名称的前缀为spring.gen-
后跟 base64 表示UUID
— 例如:spring.gen-MRBv9sqISkuCiPfOYfpo4g
.
您可以提供AnonymousQueue.NamingStrategy
构造函数参数中的实现。
以下示例显示了如何执行此作:
@Bean
public Queue anon1() {
return new AnonymousQueue();
}
@Bean
public Queue anon2() {
return new AnonymousQueue(new AnonymousQueue.Base64UrlNamingStrategy("something-"));
}
@Bean
public Queue anon3() {
return new AnonymousQueue(AnonymousQueue.UUIDNamingStrategy.DEFAULT);
}
第一个 bean 生成一个队列名称,前缀为spring.gen-
后跟 base64 表示UUID
— 用于
例:spring.gen-MRBv9sqISkuCiPfOYfpo4g
.
第二个 bean 生成一个队列名称,前缀为something-
后跟 base64 表示UUID
. 第三个 bean 仅使用 UUID(无 base64 转换)生成名称,例如f20c818a-006b-4416-bf91-643590fedb0e
.
base64 编码使用 RFC 4648 中的“URL 和文件名安全字母表”。尾随填充字符 () 被删除。=
您可以提供自己的命名策略,从而可以在队列名称中包含其他信息(例如应用程序名称或客户端主机)。
使用 XML 配置时,可以指定命名策略。 这naming-strategy
属性存在于<rabbit:queue>
元素 对于实现AnonymousQueue.NamingStrategy
. 以下示例演示如何以各种方式指定命名策略:
<rabbit:queue id="uuidAnon" />
<rabbit:queue id="springAnon" naming-strategy="uuidNamer" />
<rabbit:queue id="customAnon" naming-strategy="customNamer" />
<bean id="uuidNamer" class="org.springframework.amqp.core.AnonymousQueue.UUIDNamingStrategy" />
<bean id="customNamer" class="org.springframework.amqp.core.AnonymousQueue.Base64UrlNamingStrategy">
<constructor-arg value="custom.gen-" />
</bean>
第一个示例创建的名称如下spring.gen-MRBv9sqISkuCiPfOYfpo4g
.
第二个示例使用 UUID 的 String 表示形式创建名称。
第三个示例创建的名称如下custom.gen-MRBv9sqISkuCiPfOYfpo4g
.
您还可以提供自己的命名策略 bean。
从版本 2.1 开始,匿名队列使用 argumentQueue.X_QUEUE_LEADER_LOCATOR
设置为client-local
默认情况下。
这可确保在应用程序连接到的节点上声明队列。
您可以通过调用queue.setLeaderLocator(null)
在构造实例后。
恢复自动删除声明
通常,RabbitAdmin
(s) 仅恢复在应用程序上下文中声明为 bean 的队列/交换/绑定;如果任何此类声明是自动删除的,则在连接丢失时代理将删除它们。
重新建立连接后,管理员将重新声明实体。
通常,通过调用admin.declareQueue(…)
,admin.declareExchange(…)
和admin.declareBinding(…)
将无法恢复。
从 2.4 版开始,管理员有一个新属性redeclareManualDeclarations
;什么时候true
,除了应用程序上下文中的 bean 之外,管理员还将恢复这些实体。
如果出现以下情况,将不会恢复个人声明deleteQueue(…)
,deleteExchange(…)
或removeBinding(…)
被称为。
删除队列和交换时,将从可恢复实体中删除关联的绑定。
最后,调用resetAllManualDeclarations()
将阻止恢复任何先前申报的实体。
4.1.12. 代理事件监听器
启用 Event Exchange 插件时,如果添加类型为BrokerEventListener
到应用程序上下文,它将选定的代理事件发布为BrokerEvent
实例,可以与普通 Spring 一起使用ApplicationListener
或@EventListener
方法。
事件由代理发布到主题交换amq.rabbitmq.event
为每种事件类型使用不同的路由键。
侦听器使用事件键,用于绑定AnonymousQueue
到交换,以便侦听器仅接收选定的事件。
由于它是一个主题交换,因此可以使用通配符(以及显式请求特定事件),如以下示例所示:
@Bean
public BrokerEventListener eventListener() {
return new BrokerEventListener(connectionFactory(), "user.deleted", "channel.#", "queue.#");
}
您可以使用普通的 Spring 技术进一步缩小单个事件侦听器中接收到的事件范围,如以下示例所示:
@EventListener(condition = "event.eventType == 'queue.created'")
public void listener(BrokerEvent event) {
...
}
4.1.13. 延迟消息交换
1.6 版引入了对延迟消息交换插件的支持
该插件目前被标记为实验性插件,但已经可用一年多(在撰写本文时)。如果对插件的更改是必要的,我们计划尽快添加对此类更改的支持。因此,Spring AMQP 中的这种支持也应该被视为实验性的。此功能在 RabbitMQ 3.6.0 和插件的 0.0.1 版本中进行了测试。 |
要使用RabbitAdmin
要将交换声明为延迟,您可以将delayed
交换 bean 上的属性设置为true
.
这RabbitAdmin
使用交换类型 (Direct
,Fanout
,依此类推)将x-delayed-type
参数和声明具有类型x-delayed-message
.
这delayed
属性(默认:false
) 在使用 XML 配置交换 Bean 时也可用。
以下示例演示如何使用它:
<rabbit:topic-exchange name="topic" delayed="true" />
要发送延迟消息,您可以将x-delay
header 通过MessageProperties
,如以下示例所示:
MessageProperties properties = new MessageProperties();
properties.setDelay(15000);
template.send(exchange, routingKey,
MessageBuilder.withBody("foo".getBytes()).andProperties(properties).build());
rabbitTemplate.convertAndSend(exchange, routingKey, "foo", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDelay(15000);
return message;
}
});
要检查消息是否延迟,请使用getReceivedDelay()
方法MessageProperties
.
它是一个单独的属性,用于避免意外传播到从输入消息生成的输出消息。
4.1.14. RabbitMQ REST API
启用管理插件后,RabbitMQ 服务器会公开一个 REST API 来监控和配置代理。
现在提供了 API 的 Java 绑定。
这com.rabbitmq.http.client.Client
是一个标准的、即时的、因此是阻塞的 API。
它基于 Spring Web 模块及其RestTemplate
实现。
另一方面,com.rabbitmq.http.client.ReactorNettyClient
是一个基于 Reactor Netty 项目的响应式、非阻塞实现。
跃点依赖项 (com.rabbitmq:http-client
)现在也是optional
.
有关更多信息,请参阅他们的 Javadoc。
4.1.15. 异常处理
RabbitMQ Java 客户端的许多作可能会抛出已检查的异常。
例如,有很多情况下IOException
实例可能会被抛出。
这RabbitTemplate
,SimpleMessageListenerContainer
,其他 Spring AMQP 组件捕获这些异常并将它们转换为AmqpException
等级制度。
这些是在 'org.springframework.amqp' 包中定义的,并且AmqpException
是层次结构的基础。
当侦听器抛出异常时,它会包装在ListenerExecutionFailedException
.
通常,消息会被代理拒绝并重新排队。
设置defaultRequeueRejected
自false
导致消息被丢弃(或路由到死信交换)。
如消息侦听器和异步情况中所述,侦听器可以抛出AmqpRejectAndDontRequeueException
(或ImmediateRequeueAmqpException
) 有条件地控制此行为。
但是,有一类错误,侦听器无法控制行为。
当遇到无法转换的消息(例如,无效的content_encoding
header),在消息到达用户代码之前会抛出一些异常。
跟defaultRequeueRejected
设置为true
(默认)(或抛出一个ImmediateRequeueAmqpException
),此类消息将一遍又一遍地重新传递。
在 1.3.2 版本之前,用户需要编写自定义ErrorHandler
,如异常处理中所述,以避免这种情况。
从 1.3.2 版开始,默认的ErrorHandler
现在是一个ConditionalRejectingErrorHandler
拒绝(并且不重新排队)失败并出现不可恢复错误的消息。
具体来说,它拒绝失败并出现以下错误的消息:
-
o.s.amqp…MessageConversionException
:在使用MessageConverter
. -
o.s.messaging…MessageConversionException
:如果映射到@RabbitListener
方法。 -
o.s.messaging…MethodArgumentNotValidException
:如果验证(例如,@Valid
) 在侦听器中使用,验证失败。 -
o.s.messaging…MethodArgumentTypeMismatchException
:如果入站消息转换为目标方法不正确的类型,则可以引发。 例如,该参数声明为Message<Foo>
但Message<Bar>
收到。 -
java.lang.NoSuchMethodException
:在 1.6.3 版本中添加。 -
java.lang.ClassCastException
:在 1.6.3 版本中添加。
您可以使用FatalExceptionStrategy
以便用户可以提供自己的条件消息拒绝规则,例如,将委托实现用于BinaryExceptionClassifier
来自 Spring Retry(消息侦听器和异步情况)。
此外,ListenerExecutionFailedException
现在有一个failedMessage
可在决策中使用的属性。
如果FatalExceptionStrategy.isFatal()
方法返回true
,错误处理程序会抛出AmqpRejectAndDontRequeueException
.
默认值FatalExceptionStrategy
当异常被确定为致命异常时,记录警告消息。
从版本 1.6.3 开始,将用户异常添加到致命列表的一种便捷方法是将子类ConditionalRejectingErrorHandler.DefaultExceptionStrategy
并覆盖isUserCauseFatal(Throwable cause)
返回的方法true
对于致命的例外。
处理 DLQ 消息的常见模式是将time-to-live
这些消息以及其他 DLQ 配置,以便这些消息过期并路由回主队列进行重试。
这种技术的问题在于,导致致命异常的消息会永远循环。
从 2.1 版开始,ConditionalRejectingErrorHandler
检测到x-death
导致引发致命异常的消息的标头。
该消息将被记录并丢弃。
您可以通过将discardFatalsWithXDeath
属性ConditionalRejectingErrorHandler
自false
.
从版本 2.1.9 开始,默认情况下,即使容器确认模式为 MANUAL,具有这些致命异常的消息也会被拒绝并且不会重新排队。
这些异常通常发生在调用侦听器之前,因此侦听器没有机会确认或确认消息,因此它以未确认状态保留在队列中。
要恢复到以前的行为,请将rejectManual 属性ConditionalRejectingErrorHandler 自false . |
4.1.16. 事务
Spring Rabbit 框架支持同步和异步用例中的自动事务管理,具有许多不同的语义,可以通过声明方式选择,这是 Spring 事务的现有用户所熟悉的。 这使得许多(如果不是最常见的)消息传递模式易于实现。
有两种方法可以向框架发出所需的事务语义信号。
在这两个RabbitTemplate
和SimpleMessageListenerContainer
,有一个标志channelTransacted
如果true
,告诉框架使用事务通道并通过提交或回滚(取决于结果)结束所有作(发送或接收),但异常表示回滚。
另一个信号是提供一个外部事务,其中一个 Spring 的PlatformTransactionManager
实现作为正在进行的作的上下文。
如果框架在发送或接收消息时已经有正在进行的事务,并且channelTransacted
flag 是true
,则消息传递事务的提交或回滚将延迟到当前事务结束。
如果channelTransacted
flag 是false
,则没有事务语义适用于消息传递作(自动确认)。
这channelTransacted
标志是配置时间设置。
在创建 AMQP 组件时,通常在应用程序启动时,它会声明和处理一次。
原则上,外部事务更加动态,因为系统在运行时响应当前线程状态。
然而,在实践中,当事务以声明方式分层到应用程序时,它通常也是一种配置设置。
对于RabbitTemplate
,外部事务由调用者根据喜好声明式或命令式提供(通常的 Spring 事务模型)。
以下示例显示了一种声明性方法(通常是首选,因为它是非侵入性的),其中模板配置了channelTransacted=true
:
@Transactional
public void doSomething() {
String incoming = rabbitTemplate.receiveAndConvert();
// do some more database processing...
String outgoing = processInDatabaseAndExtractReply(incoming);
rabbitTemplate.convertAndSend(outgoing);
}
在前面的示例中,String
有效负载在标记为@Transactional
.
如果数据库处理失败并出现异常,那么传入消息将返回到代理,并且不会发送传出消息。
这适用于使用RabbitTemplate
在事务方法链中(例如,除非Channel
直接纵以提前提交事务)。
对于具有SimpleMessageListenerContainer
,如果需要外部事务,则容器在设置侦听器时必须请求它。
为了发出需要外部事务的信号,用户提供了PlatformTransactionManager
配置时到容器。
以下示例显示了如何执行此作:
@Configuration
public class ExampleExternalTransactionAmqpConfiguration {
@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setTransactionManager(transactionManager());
container.setChannelTransacted(true);
container.setQueueName("some.queue");
container.setMessageListener(exampleListener());
return container;
}
}
在前面的示例中,事务管理器被添加为从另一个 Bean 定义注入的依赖项(未显示),并且channelTransacted
flag 也设置为true
.
其效果是,如果侦听器因异常而失败,则事务将回滚,并且消息也会返回给代理。
值得注意的是,如果事务未能提交(例如,由于
数据库约束错误或连接问题),AMQP 事务也会回滚,并将消息返回给代理。
这有时被称为“尽力而为的 1 阶段提交”,是可靠消息传递的非常强大的模式。
如果channelTransacted
flag 设置为false
(默认值)在前面的示例中,仍将为侦听器提供外部事务,但所有消息传递作都将自动确认,因此即使在业务作回滚时,效果也是提交消息传递作。
条件回滚
在 1.6.6 版之前,将回滚规则添加到容器的transactionAttribute
使用外部事务管理器(如 JDBC)时没有效果。
异常总是回滚事务。
此外,当在容器的通知链中使用事务通知时,条件回滚不是很有用,因为所有侦听器异常都包装在ListenerExecutionFailedException
.
第一个问题已得到纠正,规则现在已正确应用。此外,ListenerFailedRuleBasedTransactionAttribute
现在提供了。它是RuleBasedTransactionAttribute
,唯一的区别是它知道ListenerExecutionFailedException
并将此类异常的原因用于规则。此事务属性可以直接在容器中使用,也可以通过事务通知使用。
以下示例使用此规则:
@Bean
public AbstractMessageListenerContainer container() {
...
container.setTransactionManager(transactionManager);
RuleBasedTransactionAttribute transactionAttribute =
new ListenerFailedRuleBasedTransactionAttribute();
transactionAttribute.setRollbackRules(Collections.singletonList(
new NoRollbackRuleAttribute(DontRollBackException.class)));
container.setTransactionAttribute(transactionAttribute);
...
}
关于回滚已接收消息的说明
AMQP 事务仅适用于发送到代理的消息和 ack。因此,当 Spring 事务回滚并收到消息时,Spring AMQP 不仅必须回滚事务,还必须手动拒绝消息(有点像 nack,但这不是规范所说的)。对消息拒绝采取的作与事务无关,并且取决于defaultRequeueRejected
属性(默认:true
). 有关拒绝失败消息的详细信息,请参阅消息侦听器和异步情况。
有关 RabbitMQ 事务及其限制的更多信息,请参阅 RabbitMQ 代理语义。
在 RabbitMQ 2.7.0 之前,此类消息(以及通道关闭或中止时未确认的任何消息)转到 Rabbit 代理的队列后面。从 2.7.0 开始,被拒绝的消息将转到队列的前面,其方式与 JMS 回滚消息类似。 |
以前,事务回滚时的消息重新排队在本地事务之间不一致,并且当TransactionManager 提供了。在前一种情况下,正常的重新排队逻辑 (AmqpRejectAndDontRequeueException 或defaultRequeueRejected=false ) 应用(请参阅消息侦听器和异步情况)。使用事务管理器时,消息在回滚时无条件地重新排队。从版本 2.0 开始,行为是一致的,并且在这两种情况下都应用了正常的重新排队逻辑。要恢复到以前的行为,您可以将容器的alwaysRequeueWithTxManagerRollback 属性设置为true .
请参阅消息侦听器容器配置。 |
用RabbitTransactionManager
RabbitTransactionManager 是在外部事务中执行 Rabbit作并与外部事务同步的替代方法。
此事务管理器是PlatformTransactionManager
接口,应该与单个 Rabbit 一起使用ConnectionFactory
.
此策略无法提供 XA 事务 — 例如,为了在消息传递和数据库访问之间共享事务。 |
需要应用程序代码才能通过以下方式检索事务性 Rabbit 资源ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactory, boolean)
而不是标准Connection.createChannel()
调用,并创建后续通道。
当使用 Spring AMQP 的 RabbitTemplate 时,它会自动检测线程绑定的 Channel 并自动参与其事务。
使用 Java 配置,您可以使用以下 bean 设置新的 RabbitTransactionManager:
@Bean
public RabbitTransactionManager rabbitTransactionManager() {
return new RabbitTransactionManager(connectionFactory);
}
如果您更喜欢 XML 配置,则可以在 XML Application Context 文件中声明以下 bean:
<bean id="rabbitTxManager"
class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
事务同步
将 RabbitMQ 事务与其他一些(例如 DBMS)事务同步可提供“尽力而为的单阶段提交”语义。
RabbitMQ 事务在事务同步的 after completion 阶段可能会提交失败。
这由spring-tx
infrastructure 作为错误,但不会向调用代码引发异常。
从 2.3.10 版本开始,您可以调用ConnectionUtils.checkAfterCompletion()
在事务在处理事务的同一线程上提交之后。
如果没有发生异常,它将简单地返回;否则它会抛出一个AfterCompletionFailedException
它将具有表示完成的同步状态的属性。
通过调用ConnectionFactoryUtils.enableAfterCompletionFailureCapture(true)
;这是一个全局标志,适用于所有线程。
4.1.17. 消息监听器容器配置
有很多选项可用于配置SimpleMessageListenerContainer
(SMLC) 和DirectMessageListenerContainer
(DMLC) 与交易和服务质量有关,其中一些相互交互。
适用于 SMLC、DMLC 或StreamListenerContainer
(圣LC)(请参阅使用 RabbitMQ Stream 插件)由相应列中的复选标记表示。
请参阅选择容器,以帮助您确定哪个容器适合您的应用程序。
下表显示了使用命名空间配置<rabbit:listener-container/>
.
这type
属性可以是simple
(默认)或direct
指定SMLC
或DMLC
分别。
命名空间不会公开某些属性。
这些由N/A
属性。
财产 (属性) | 描述 | SMLC公司 | DMLC的 | StLC的 | ||||||
---|---|---|---|---|---|---|---|---|---|---|
|
什么时候 |
![]() |
||||||||
|
|
![]() |
![]() |
|||||||
|
应用于侦听器执行的AOP Advice数组。这可用于应用其他横切问题,例如在代理死亡时自动重试。请注意,AMQP错误后的简单重新连接由 |
![]() |
![]() |
|||||||
数组 |
![]() |
![]() |
||||||||
设置为 |
![]() |
![]() |
||||||||
|
当设置为
|
![]() |
![]() |
|||||||
|
标志,指示容器应在 |
![]() |
![]() |
![]() |
||||||
|
与 |
![]() |
||||||||
|
取消批处理消息时使用的策略。
违约 |
![]() |
![]() |
|||||||
|
布尔标志,表示所有消息都应在事务中确认(手动或自动)。 |
![]() |
![]() |
|||||||
|
|
![]() |
||||||||
每个侦听器最初启动的并发使用者数。请参阅侦听器并发。对于 |
![]() |
![]() |
||||||||
|
对 |
![]() |
![]() |
|||||||
|
在考虑启动新消费者时,消费者收到的最小连续消息数,而没有发生接收超时。也受“batchSize”的影响。请参阅侦听器并发。默认值:10。 |
![]() |
||||||||
|
使用者在考虑停止使用者之前必须经历的最小接收超时数。 也受到“batchSize”的影响。 请参阅侦听器并发。 默认值:10。 |
![]() |
||||||||
|
如果 |
![]() |
||||||||
|
一个 |
![]() |
||||||||
|
等待使用者线程启动的时间(以毫秒为单位)。
如果过了这个时间,则会写入错误日志。
可能发生这种情况的一个示例是,如果配置的 请参阅线程和异步使用者。 默认值:60000(一分钟)。 |
![]() |
||||||||
|
设置 ConsumerTagStrategy 的实现,以便为每个使用者创建一个(唯一)标记。 |
![]() |
![]() |
|||||||
|
要为每个配置的队列创建的使用者数量。 请参阅侦听器并发。 |
![]() |
||||||||
|
将 RabbitMQ 分片插件与 |
![]() |
![]() |
|||||||
|
当 true 时,侦听器容器将对批处理的消息进行批处理,并使用批处理中的每条消息调用侦听器。
从 2.2.7 版开始,生产者创建的批次将作为 |
![]() |
![]() |
|||||||
|
被动队列声明失败时的重试次数。 当使用者启动时,或者当从多个队列使用时,当初始化期间并非所有队列都可用时,就会发生被动队列声明。 当重试用尽后无法被动声明任何配置的队列(出于任何原因)时,容器行为由前面所述的“missingQueuesFatal”属性控制。 默认值:重试 3 次(总共尝试 4 次)。 |
![]() |
||||||||
|
确定是否应将因侦听器引发异常而被拒绝的消息重新排队。
违约: |
![]() |
![]() |
|||||||
|
对 |
![]() |
![]() |
|||||||
|
确定此容器中的单个使用者是否具有对队列的独占访问权限。当容器的并发性为 1 时 |
![]() |
![]() |
|||||||
当独占使用者无法访问队列时使用的异常记录器。
默认情况下,这记录在 |
![]() |
![]() |
||||||||
|
被动队列声明重试尝试之间的间隔。 当使用者启动时,或者当从多个队列使用时,当初始化期间并非所有队列都可用时,就会发生被动队列声明。 默认值:5000(五秒)。 |
![]() |
![]() |
|||||||
|
如果消费者在 |
![]() |
![]() |
|||||||
|
设置为 true 在处理当前记录后停止(当容器停止时);导致所有预取的消息重新排队。
默认情况下,容器将取消使用者并在停止之前处理所有预取的消息。
从 2.4.14、3.0.6 版本开始
默认为 |
![]() |
![]() |
|||||||
|
当 true 时,则 |
![]() |
![]() |
|||||||
(组) |
这仅在使用命名空间时可用。
指定时,类型为 |
![]() |
![]() |
|||||||
|
请参阅检测空闲异步使用者。 |
![]() |
![]() |
|||||||
|
一 |
![]() |
![]() |
|||||||
|
如果需要,按需启动的最大并发使用者数。 必须大于或等于“concurrentConsumers”。 请参阅侦听器并发。 |
![]() |
||||||||
|
在确认之间要接收的消息数。
使用它来减少发送到代理的确认数(以增加重新传递消息的可能性为代价)。
通常,应仅在大容量侦听器容器上设置此属性。
如果设置了此设置并拒绝了消息(引发异常),则确认挂起的确认并拒绝失败的消息。
不允许用于交易通道。
如果 |
![]() |
||||||||
|
当容器启动时,如果此属性为 如果在恢复过程中检测到问题(例如,在连接丢失后),则容器将停止。 必须有一个
|
![]() |
![]() |
|||||||
|
当设置为 这在以前的版本中是不可配置的。 当设置为 您还可以使用属性 bean 为所有容器全局设置属性,如下所示:
此全局属性不适用于具有显式 可以通过设置以下属性来覆盖默认重试属性(以五秒为间隔重试三次)。
|
![]() |
![]() |
|||||||
|
使用 DMLC,任务被安排在此时间间隔运行,以监视使用者的状态并恢复任何失败的任务。 |
![]() |
||||||||
|
设置为 |
![]() |
![]() |
|||||||
|
什么时候 |
![]() |
![]() |
|||||||
|
当设置为 从 2.0 版本开始。 DirectMessageListenerContainer 当设置为 SimpleMessageListenerContainer 当设置为 您还可以使用属性 bean 为所有容器全局设置属性,如下所示:
此全局属性不会应用于任何具有显式 默认重试属性(以 5 秒为间隔重试 3 次)可以使用此属性之后的属性进行覆盖。 |
![]() |
![]() |
|||||||
|
每个使用者可能未完成的未确认消息数。
此值越高,消息的传递速度越快,但非顺序处理的风险就越高。
如果
另请参阅 |
![]() |
![]() |
|||||||
|
当侦听器容器侦听至少一个自动删除队列,并且在启动期间发现该队列丢失时,容器会使用 |
![]() |
![]() |
|||||||
|
等待每条消息的最长时间。
如果 |
![]() |
||||||||
|
指定 |
![]() |
![]() |
|||||||
|
确定尝试启动使用者之间的时间(以毫秒为单位)(如果使用者因非致命原因而无法启动)。
默认值:5000。
相互排斥 |
![]() |
![]() |
|||||||
|
如果在使用者初始化期间配置的队列子集可用,则使用者将开始从这些队列使用。 使用者尝试使用此间隔被动声明缺少的队列。 当此间隔过去时,将再次使用“declarationRetries”和“failedDeclarationRetryInterval”。 如果仍然缺少队列,使用者会再次等待此间隔,然后再重试。 此过程无限期地持续,直到所有队列都可用。 默认值:60000(一分钟)。 |
![]() |
||||||||
|
当容器关闭时(例如,
如果它包含 |
![]() |
![]() |
|||||||
|
按需启动每个新使用者之前必须经过的时间(以毫秒为单位)。 请参阅侦听器并发。 默认值:10000(10 秒)。 |
![]() |
||||||||
|
使用有状态重试建议时,如果缺少 |
![]() |
![]() |
|||||||
|
在检测到空闲使用者时停止最后一个使用者后,使用者停止之前必须经过的时间(以毫秒为单位)。 请参阅侦听器并发。 默认值:60000(一分钟)。 |
![]() |
||||||||
|
一个 |
![]() |
||||||||
|
对弹簧的引用 |
![]() |
![]() |
|||||||
|
对于 DMLC,调度程序用于在“monitorInterval”运行监视任务。 |
![]() |
||||||||
|
用于侦听器作的外部事务管理器。
也与 |
![]() |
![]() |
4.1.18. 侦听器并发
SimpleMessageListenerContainer
默认情况下,侦听器容器启动一个从队列接收消息的单个使用者。
在检查上一节中的表时,您可以看到许多控制并发的属性和属性。
最简单的是concurrentConsumers
,这将创建并发处理消息的(固定)数量的使用者。
在 1.3.0 版之前,这是唯一可用的设置,必须停止容器并重新启动才能更改设置。
从 1.3.0 版本开始,您现在可以动态调整concurrentConsumers
财产。
如果在容器运行时更改了它,则会根据需要添加或删除使用者以适应新设置。
此外,一个名为maxConcurrentConsumers
,容器会根据工作负载动态调整并发。
这与四个附加属性结合使用:consecutiveActiveTrigger
,startConsumerMinInterval
,consecutiveIdleTrigger
和stopConsumerMinInterval
.
在默认设置下,增加消费者的算法工作原理如下:
如果maxConcurrentConsumers
未达到,并且现有使用者连续十个周期处于活动状态,并且自上次使用者启动以来至少已过去 10 秒,则启动新的使用者。
如果使用者在batchSize
* receiveTimeout
毫秒。
使用默认设置,减少使用者的算法工作原理如下:
如果超过concurrentConsumers
正在运行,并且使用者检测到连续十次超时(空闲),并且最后一个使用者至少在 60 秒前停止,则使用者停止。
超时取决于receiveTimeout
和batchSize
性能。
如果使用者在batchSize
* receiveTimeout
毫秒。
因此,使用默认超时(一秒)和batchSize
在 4 个空闲时间中,考虑在 40 秒的空闲时间后停止使用者(四个超时对应于一个空闲检测)。
实际上,只有当整个容器闲置一段时间时,消费者才能停止。 这是因为经纪人在所有活跃消费者之间共享其工作。 |
每个使用者使用单个通道,而不考虑配置的队列数量。
从 2.0 版开始,concurrentConsumers
和maxConcurrentConsumers
属性可以使用concurrency
属性 — 例如,2-4
.
4.1.19. 独占消费者
从 1.3 版开始,您可以使用单个独占使用者配置侦听器容器。
这可以防止其他容器从队列中消费,直到当前使用者被取消。
此类容器的并发性必须为1
.
当使用独占消费者时,其他容器会尝试根据recoveryInterval
属性并记录WARN
如果尝试失败,则显示消息。
4.1.20. 侦听器容器队列
1.3 版引入了许多改进,用于处理侦听器容器中的多个队列。
容器最初可以配置为侦听零队列。可以在运行时添加和删除队列。 这SimpleMessageListenerContainer
在处理任何预取消息时回收(取消和重新创建)所有使用者。 这DirectMessageListenerContainer
为每个队列创建/取消单独的消费者,而不会影响其他队列上的消费者。有关addQueues
,addQueueNames
,removeQueues
和removeQueueNames
方法。
如果并非所有队列都可用,则容器会尝试每 60 秒被动声明(并从中消耗)缺少的队列。
此外,如果使用者收到来自代理的取消(例如,如果队列被删除),则使用者会尝试恢复,并且恢复的使用者将继续处理来自任何其他已配置队列的消息。以前,对一个队列的取消会取消整个使用者,最终,容器将因缺少队列而停止。
如果要永久删除队列,则应在删除队列之前或之后更新容器,以避免将来尝试从中消费。
4.1.21. 弹性:从错误和代理故障中恢复
Spring AMQP 提供的一些关键(也是最流行的)高级功能与在发生协议错误或代理故障时的恢复和自动重新连接有关。 我们已经在本指南中查看了所有相关组件,但将它们全部汇集在一起并单独调用功能和恢复方案应该会有所帮助。
主要重新连接功能由CachingConnectionFactory
本身。
使用RabbitAdmin
自动声明功能。
此外,如果您关心保证交付,您可能还需要使用channelTransacted
标记RabbitTemplate
和SimpleMessageListenerContainer
和AcknowledgeMode.AUTO
(如果您自己做 ack,则手动)在SimpleMessageListenerContainer
.
自动声明交换、队列和绑定
这RabbitAdmin
组件可以在启动时声明交换、队列和绑定。
它通过ConnectionListener
.
因此,如果代理在启动时不存在,则无关紧要。
第一次Connection
(例如,
通过发送消息)触发侦听器并应用管理功能。
在侦听器中执行自动声明的另一个好处是,如果连接因任何原因(例如,
代理死亡、网络故障等),当重新建立连接时,它们将再次应用。
以这种方式声明的队列必须具有固定名称——显式声明或由框架生成AnonymousQueue 实例。 匿名队列是非持久的、独占的和自动删除的。 |
仅当CachingConnectionFactory 缓存模式为CHANNEL (默认值)。
存在此限制,因为独占队列和自动删除队列绑定到连接。 |
从 2.2.2 版本开始,RabbitAdmin
将检测类型DeclarableCustomizer
并在实际处理声明之前应用该函数。
例如,这对于在框架内获得一流支持之前设置新参数(属性)很有用。
@Bean
public DeclarableCustomizer customizer() {
return dec -> {
if (dec instanceof Queue && ((Queue) dec).getName().equals("my.queue")) {
dec.addArgument("some.new.queue.argument", true);
}
return dec;
};
}
它在不提供对Declarable
bean 定义。
另请参阅 RabbitMQ 自动连接/拓扑恢复。
同步作失败和重试选项
如果您在使用RabbitTemplate
(例如),Spring AMQP 会抛出AmqpException
(通常,但并非总是,AmqpIOException
).
我们不会试图隐藏存在问题的事实,因此您必须能够捕获并响应异常。
如果您怀疑连接丢失(这不是您的错),最简单的方法是重试该作。
您可以手动执行此作,也可以考虑使用 Spring Retry 来处理重试(命令式或声明式)。
Spring Retry 提供了几个 AOP 拦截器和很大的灵活性来指定重试的参数(尝试次数、异常类型、回退算法等)。Spring AMQP 还提供了一些方便的工厂 bean,用于以方便的形式为 AMQP 用例创建 Spring Retry 拦截器,并具有可用于实现自定义恢复逻辑的强类型回调接口。请参阅 Javadoc 和StatefulRetryOperationsInterceptor
和StatelessRetryOperationsInterceptor
了解更多详情。如果没有事务或事务在重试回调中启动,则无状态重试是合适的。请注意,无状态重试比有状态重试更易于配置和分析,但如果存在必须回滚或肯定会回滚的正在进行的事务,则通常不合适。事务中间断开的连接应该与回滚具有相同的效果。因此,对于事务在堆栈上层启动的重新连接,有状态重试通常是最佳选择。有状态重试需要一种唯一标识消息的机制。最简单的方法是让发送者在MessageId
message 属性。提供的消息转换器提供了一个选项来执行此作:您可以将createMessageIds
自true
. 否则,您可以注入MessageKeyGenerator
实现到拦截器中。密钥生成器必须为每条消息返回一个唯一的密钥。在 2.0 版之前的版本中,MissingMessageIdAdvice
被提供。它启用了没有messageId
属性,只重试一次(忽略重试设置)。不再提供此建议,因为spring-retry
版本 1.2,其功能内置于拦截器和消息侦听器容器中。
为了向后兼容,默认情况下(重试一次后),具有空消息 ID 的消息对使用者(使用者已停止)被视为致命消息。要复制MissingMessageIdAdvice ,您可以将statefulRetryFatalWithNullMessageId 属性设置为false 在侦听器容器上。使用该设置,使用者将继续运行,并且消息被拒绝(重试一次后)。它被丢弃或路由到死信队列(如果已配置)。 |
从 1.3 版开始,提供了一个构建器 API,以帮助使用 Java(在@Configuration
类)。
以下示例显示了如何执行此作:
@Bean
public StatefulRetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateful()
.maxAttempts(5)
.backOffOptions(1000, 2.0, 10000) // initialInterval, multiplier, maxInterval
.build();
}
只能以这种方式配置重试功能的子集。
更高级的功能需要配置RetryTemplate
作为春豆。
有关可用策略及其配置的完整信息,请参阅 Spring Retry Javadoc。
使用批处理侦听器重试
不建议使用批处理侦听器配置重试,除非批处理是由生产者在单个记录中创建的。 有关使用者和生产者创建的批处理的信息,请参阅批处理消息。 对于使用者创建的批处理,框架不知道批处理中的哪条消息导致了失败,因此无法在重试用尽后进行恢复。 对于生产者创建的批处理,由于只有一条消息实际失败,因此可以恢复整个消息。 应用程序可能希望通过设置引发异常的索引属性来通知自定义恢复器在批处理中发生故障的位置。
批处理侦听器的重试恢复器必须实现MessageBatchRecoverer
.
消息侦听器和异步情况
如果MessageListener
由于业务异常而失败,则该异常由消息侦听器容器处理,然后返回侦听另一条消息。
如果故障是由断开连接(不是业务异常)引起的,则必须取消并为侦听器收集消息的使用者并重新启动。
这SimpleMessageListenerContainer
无缝处理这个问题,并留下一个日志,说侦听器正在重新启动。事实上,它无休止地循环,试图重新启动消费者。只有当消费者确实表现得非常糟糕时,它才会放弃。一个副作用是,如果容器启动时代理关闭,它会继续尝试,直到可以建立连接。
与协议错误和断开连接相反,业务异常处理可能需要更多的思考和一些自定义配置,尤其是在使用事务或容器确认的情况下。在 2.8.x 之前,RabbitMQ 没有死信行为的定义。因此,默认情况下,由于业务异常而被拒绝或回滚的消息可以无休止地重新传递。要对客户端的重新传递次数进行限制,一个选择是StatefulRetryOperationsInterceptor
在侦听器的通知链中。拦截器可以有一个恢复回调,用于实现自定义死信作——任何适合您的特定环境的作。
另一种方法是将容器的defaultRequeueRejected
属性设置为false
. 这会导致所有失败的消息都被丢弃。当使用 RabbitMQ 2.8.x 或更高版本时,这也有助于将消息传递到死信交换。
或者,您可以抛出一个AmqpRejectAndDontRequeueException
. 这样做可以防止消息重新排队,无论defaultRequeueRejected
财产。
从 2.1 版开始,ImmediateRequeueAmqpException
引入来执行完全相反的逻辑:消息将被重新排队,而不管defaultRequeueRejected
财产。
通常,会结合使用这两种技术。您可以使用StatefulRetryOperationsInterceptor
在通知链中,带有MessageRecoverer
这会抛出一个AmqpRejectAndDontRequeueException
.
这MessageRecover
当所有重试都用完时调用。 这RejectAndDontRequeueRecoverer
正是这样做的。默认的MessageRecoverer
消耗错误消息并发出WARN
消息。
从 1.3 版开始,新的RepublishMessageRecoverer
,以允许在重试用尽后发布失败的消息。
当恢复器使用最终异常时,消息将被确认,并且代理不会发送到死信交换(如果已配置)。
什么时候RepublishMessageRecoverer 在消费者端使用,接收到的消息有deliveryMode 在receivedDeliveryMode message 属性。
在本例中,deliveryMode 是null .
这意味着NON_PERSISTENT 代理上的交付模式。
从 2.0 版开始,您可以配置RepublishMessageRecoverer 对于deliveryMode 设置为要重新发布的消息(如果是)null .
默认情况下,它使用MessageProperties 默认值 -MessageDeliveryMode.PERSISTENT . |
以下示例显示了如何设置RepublishMessageRecoverer
作为恢复者:
@Bean
RetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateless()
.maxAttempts(5)
.recoverer(new RepublishMessageRecoverer(amqpTemplate(), "something", "somethingelse"))
.build();
}
这RepublishMessageRecoverer
在邮件标头中发布包含其他信息的邮件,例如异常邮件、堆栈跟踪、原始交换和路由密钥。
可以通过创建子类并覆盖additionalHeaders()
.
这deliveryMode
(或任何其他属性)也可以在additionalHeaders()
,如以下示例所示:
RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(amqpTemplate, "error") {
protected Map<? extends String, ? extends Object> additionalHeaders(Message message, Throwable cause) {
message.getMessageProperties()
.setDeliveryMode(message.getMessageProperties().getReceivedDeliveryMode());
return null;
}
};
从 2.0.5 版开始,如果堆栈跟踪太大,它可能会被截断;这是因为所有标头都必须适合单个帧。默认情况下,如果堆栈跟踪会导致其他标头可用的字节少于 20,000 字节(“余量”),则它将被截断。这可以通过设置恢复器的frameMaxHeadroom
属性,如果您需要更多或更少的空间来存储其他标头。
从 2.1.13、2.2.3 版本开始,异常消息将包含在此计算中,并且将使用以下算法最大化堆栈跟踪量:
-
如果单独的堆栈跟踪将超过限制,则异常消息标头将被截断为 97 字节加
…
并且堆栈跟踪也会被截断。 -
如果堆栈跟踪很小,则消息将被截断(加上
…
) 以适合可用字节(但堆栈跟踪本身中的消息被截断为 97 字节加…
).
每当发生任何类型的截断时,都会记录原始异常以保留完整信息。 在增强标头后执行评估,以便在表达式中使用异常类型等信息。
从版本 2.4.8 开始,错误交换和路由密钥可以作为 SpEL 表达式提供,使用Message
作为评估的根对象。
从 2.3.3 版开始,一个新的子类RepublishMessageRecovererWithConfirms
提供;这支持两种样式的发布者确认,并将等待确认后再返回(如果未确认或返回消息,则引发异常)。
如果确认类型为CORRELATED
,子类还将检测是否返回消息并抛出AmqpMessageReturnedException
;如果发布被否定确认,它将抛出一个AmqpNackReceivedException
.
如果确认类型为SIMPLE
,子类将调用waitForConfirmsOrDie
方法。
有关确认和退货的更多信息,请参阅发布商确认和退货。
从 2.1 版开始,ImmediateRequeueMessageRecoverer
添加以抛出ImmediateRequeueAmqpException
,它通知侦听器容器将当前失败的消息重新排队。
春季重试的异常分类
春季重试具有很大的灵活性,可以确定哪些异常可以调用重试。
默认配置对所有异常重试。
鉴于用户异常包装在ListenerExecutionFailedException
,我们需要确保分类检查异常原因。
默认分类器仅查看顶级异常。
从 Spring Retry 1.0.3 开始,BinaryExceptionClassifier
有一个名为traverseCauses
(默认值:false
).
什么时候true
,它遍历异常原因,直到找到匹配项或没有原因。
若要使用此分类器进行重试,可以使用SimpleRetryPolicy
使用采用最大尝试次数的构造函数创建,则Map
之Exception
实例,布尔值 (traverseCauses
) 并将此策略注入到RetryTemplate
.
4.1.22. 多代理(或集群)支持
2.3 版在单个应用程序与多个代理或代理集群之间进行通信时增加了更多便利。 在消费者方面,主要好处是基础设施可以自动将自动声明的队列与适当的代理相关联。
用一个例子来说明这一点:
@SpringBootApplication(exclude = RabbitAutoConfiguration.class)
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
CachingConnectionFactory cf1() {
return new CachingConnectionFactory("localhost");
}
@Bean
CachingConnectionFactory cf2() {
return new CachingConnectionFactory("otherHost");
}
@Bean
CachingConnectionFactory cf3() {
return new CachingConnectionFactory("thirdHost");
}
@Bean
SimpleRoutingConnectionFactory rcf(CachingConnectionFactory cf1,
CachingConnectionFactory cf2, CachingConnectionFactory cf3) {
SimpleRoutingConnectionFactory rcf = new SimpleRoutingConnectionFactory();
rcf.setDefaultTargetConnectionFactory(cf1);
rcf.setTargetConnectionFactories(Map.of("one", cf1, "two", cf2, "three", cf3));
return rcf;
}
@Bean("factory1-admin")
RabbitAdmin admin1(CachingConnectionFactory cf1) {
return new RabbitAdmin(cf1);
}
@Bean("factory2-admin")
RabbitAdmin admin2(CachingConnectionFactory cf2) {
return new RabbitAdmin(cf2);
}
@Bean("factory3-admin")
RabbitAdmin admin3(CachingConnectionFactory cf3) {
return new RabbitAdmin(cf3);
}
@Bean
public RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry() {
return new RabbitListenerEndpointRegistry();
}
@Bean
public RabbitListenerAnnotationBeanPostProcessor postProcessor(RabbitListenerEndpointRegistry registry) {
MultiRabbitListenerAnnotationBeanPostProcessor postProcessor
= new MultiRabbitListenerAnnotationBeanPostProcessor();
postProcessor.setEndpointRegistry(registry);
postProcessor.setContainerFactoryBeanName("defaultContainerFactory");
return postProcessor;
}
@Bean
public SimpleRabbitListenerContainerFactory factory1(CachingConnectionFactory cf1) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cf1);
return factory;
}
@Bean
public SimpleRabbitListenerContainerFactory factory2(CachingConnectionFactory cf2) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cf2);
return factory;
}
@Bean
public SimpleRabbitListenerContainerFactory factory3(CachingConnectionFactory cf3) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cf3);
return factory;
}
@Bean
RabbitTemplate template(SimpleRoutingConnectionFactory rcf) {
return new RabbitTemplate(rcf);
}
@Bean
ConnectionFactoryContextWrapper wrapper(SimpleRoutingConnectionFactory rcf) {
return new ConnectionFactoryContextWrapper(rcf);
}
}
@Component
class Listeners {
@RabbitListener(queuesToDeclare = @Queue("q1"), containerFactory = "factory1")
public void listen1(String in) {
}
@RabbitListener(queuesToDeclare = @Queue("q2"), containerFactory = "factory2")
public void listen2(String in) {
}
@RabbitListener(queuesToDeclare = @Queue("q3"), containerFactory = "factory3")
public void listen3(String in) {
}
}
如您所见,我们声明了 3 组基础设施(连接工厂、管理员、容器工厂)。
如前所述,@RabbitListener
可以定义使用哪个容器工厂;在这种情况下,他们还使用queuesToDeclare
这会导致队列(如果不存在)在代理上声明。
通过将RabbitAdmin
bean 与约定<container-factory-name>-admin
,基础结构能够确定哪个管理员应声明队列。
这也适用于bindings = @QueueBinding(…)
据此,交换和约束也将被声明。
它不适用于queues
,因为这期望队列已经存在。
在生产者方面,方便ConnectionFactoryContextWrapper
class 时,使用RoutingConnectionFactory
(参见路由连接工厂)更简单。
如上所示,一个SimpleRoutingConnectionFactory
bean 已添加路由键one
,two
和three
.
还有一个RabbitTemplate
使用该工厂。
下面是一个示例,将该模板与包装器一起使用,以路由到其中一个代理集群。
@Bean
public ApplicationRunner runner(RabbitTemplate template, ConnectionFactoryContextWrapper wrapper) {
return args -> {
wrapper.run("one", () -> template.convertAndSend("q1", "toCluster1"));
wrapper.run("two", () -> template.convertAndSend("q2", "toCluster2"));
wrapper.run("three", () -> template.convertAndSend("q3", "toCluster3"));
};
}
4.2. 使用 RabbitMQ Stream 插件
版本 2.4 引入了对 RabbitMQ Stream 插件的初始支持RabbitMQ Stream 插件 Java 客户端。
-
RabbitStreamTemplate
-
StreamListenerContainer
添加spring-rabbit-stream
对项目的依赖:
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-stream</artifactId>
<version>3.0.14</version>
</dependency>
compile 'org.springframework.amqp:spring-rabbit-stream:3.0.14'
您可以像往常一样配置队列,使用RabbitAdmin
bean,使用QueueBuilder.stream()
方法来指定队列类型。
例如:
@Bean
Queue stream() {
return QueueBuilder.durable("stream.queue1")
.stream()
.build();
}
但是,这仅在您还使用非流组件(例如SimpleMessageListenerContainer
或DirectMessageListenerContainer
),因为在打开 AMQP 连接时,会触发管理员声明定义的 bean。
如果您的应用程序仅使用流组件,或者您希望使用高级流配置功能,则应配置StreamAdmin
相反:
@Bean
StreamAdmin streamAdmin(Environment env) {
return new StreamAdmin(env, sc -> {
sc.stream("stream.queue1").maxAge(Duration.ofHours(2)).create();
sc.stream("stream.queue2").create();
});
}
有关StreamCreator
.
4.2.1. 发送消息
这RabbitStreamTemplate
提供了RabbitTemplate
(AMQP) 功能。
public interface RabbitStreamOperations extends AutoCloseable {
CompletableFuture<Boolean> send(Message message);
CompletableFuture<Boolean> convertAndSend(Object message);
CompletableFuture<Boolean> convertAndSend(Object message, @Nullable MessagePostProcessor mpp);
CompletableFuture<Boolean> send(com.rabbitmq.stream.Message message);
MessageBuilder messageBuilder();
MessageConverter messageConverter();
StreamMessageConverter streamMessageConverter();
@Override
void close() throws AmqpException;
}
这RabbitStreamTemplate
实现具有以下构造函数和属性:
public RabbitStreamTemplate(Environment environment, String streamName) {
}
public void setMessageConverter(MessageConverter messageConverter) {
}
public void setStreamConverter(StreamMessageConverter streamConverter) {
}
public synchronized void setProducerCustomizer(ProducerCustomizer producerCustomizer) {
}
这MessageConverter
用于convertAndSend
将对象转换为 Spring AMQP 的方法Message
.
这StreamMessageConverter
用于从 Spring AMQP 转换Message
到本机流Message
.
您还可以发送原生流Message
直接;使用messageBuilder()
提供对Producer
的消息生成器。
这ProducerCustomizer
提供了一种在构建生产者之前对其进行自定义的机制。
请参阅 Java 客户端文档,了解如何自定义Environment
和Producer
.
从版本 3.0 开始,方法返回类型为CompletableFuture 而不是ListenableFuture . |
4.2.2. 接收消息
异步消息接收由StreamListenerContainer
(和StreamRabbitListenerContainerFactory
使用时@RabbitListener
).
侦听器容器需要一个Environment
以及单个流名称。
您可以接收 Spring AMQPMessage
s 使用经典MessageListener
,或者您可以接收原生流Message
s 使用新接口:
public interface StreamMessageListener extends MessageListener {
void onStreamMessage(Message message, Context context);
}
有关支持的属性的信息,请参阅消息侦听器容器配置。
与模板类似,容器有一个ConsumerCustomizer
财产。
请参阅 Java 客户端文档,了解如何自定义Environment
和Consumer
.
使用时@RabbitListener
,配置一个StreamRabbitListenerContainerFactory
;此时,大多数@RabbitListener
属性 (concurrency
等)被忽略。只id
,queues
,autoStartup
和containerFactory
被支持。
另外queues
只能包含一个流名称。
4.2.3. 示例
@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
RabbitStreamTemplate template = new RabbitStreamTemplate(env, "test.stream.queue1");
template.setProducerCustomizer((name, builder) -> builder.name("test"));
return template;
}
@Bean
RabbitListenerContainerFactory<StreamListenerContainer> rabbitListenerContainerFactory(Environment env) {
return new StreamRabbitListenerContainerFactory(env);
}
@RabbitListener(queues = "test.stream.queue1")
void listen(String in) {
...
}
@Bean
RabbitListenerContainerFactory<StreamListenerContainer> nativeFactory(Environment env) {
StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory(env);
factory.setNativeListener(true);
factory.setConsumerCustomizer((id, builder) -> {
builder.name("myConsumer")
.offset(OffsetSpecification.first())
.manualTrackingStrategy();
});
return factory;
}
@RabbitListener(id = "test", queues = "test.stream.queue2", containerFactory = "nativeFactory")
void nativeMsg(Message in, Context context) {
...
context.storeOffset();
}
@Bean
Queue stream() {
return QueueBuilder.durable("test.stream.queue1")
.stream()
.build();
}
@Bean
Queue stream() {
return QueueBuilder.durable("test.stream.queue2")
.stream()
.build();
}
版本 2.4.5 添加了adviceChain
属性设置为StreamListenerContainer
(及其工厂)。
还提供了一个新的工厂 bean 来创建一个无状态重试拦截器,其中包含可选的StreamMessageRecoverer
用于使用原始流消息时。
@Bean
public StreamRetryOperationsInterceptorFactoryBean sfb(RetryTemplate retryTemplate) {
StreamRetryOperationsInterceptorFactoryBean rfb =
new StreamRetryOperationsInterceptorFactoryBean();
rfb.setRetryOperations(retryTemplate);
rfb.setStreamMessageRecoverer((msg, context, throwable) -> {
...
});
return rfb;
}
此容器不支持有状态重试。 |
4.2.4. 超级流
超级流是分区流的一个抽象概念,通过将多个流队列绑定到具有参数的交换来实现x-super-stream: true
.
供应
为方便起见,可以通过定义类型为SuperStream
.
@Bean
SuperStream superStream() {
return new SuperStream("my.super.stream", 3);
}
这RabbitAdmin
检测到此 bean 并将声明交换 (my.super.stream
) 和 3 个队列(分区) -my.super-stream-n
哪里n
是0
,1
,2
,绑定路由键等于n
.
如果您还希望通过 AMQP 发布到交易所,则可以提供自定义路由密钥:
@Bean
SuperStream superStream() {
return new SuperStream("my.super.stream", 3, (q, i) -> IntStream.range(0, i)
.mapToObj(j -> "rk-" + j)
.collect(Collectors.toList()));
}
键的数量必须等于分区的数量。
生产到 SuperStream
您必须添加一个superStreamRoutingFunction
到RabbitStreamTemplate
:
@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
RabbitStreamTemplate template = new RabbitStreamTemplate(env, "stream.queue1");
template.setSuperStreamRouting(message -> {
// some logic to return a String for the client's hashing algorithm
});
return template;
}
您还可以使用RabbitTemplate
.
使用单个活跃消费者的超级流
调用superStream
方法,以在超级流上启用单个活动使用者。
@Bean
StreamListenerContainer container(Environment env, String name) {
StreamListenerContainer container = new StreamListenerContainer(env);
container.superStream("ss.sac", "myConsumer", 3); // concurrency = 3
container.setupMessageListener(msg -> {
...
});
container.setConsumerCustomizer((id, builder) -> builder.offset(OffsetSpecification.last()));
return container;
}
这时,当并发大于1时,实际并发由Environment ;要实现完全并发,请将环境的maxConsumersByConnection 到 1.
请参阅配置环境。 |
4.2.5. 千分尺观察
从版本 3.0.5 开始,现在支持使用 Micrometer 进行观察,用于RabbitStreamTemplate
和流侦听器容器。
容器现在还支持千分尺计时器(未启用观察时)。
设置observationEnabled
在每个组件上进行观察;这将禁用千分尺计时器,因为计时器现在将通过每次观测进行管理。
使用带注释的监听器时,将observationEnabled
在集装箱工厂。
有关更多信息,请参阅千分尺追踪。
要向计时器/跟踪添加标记,请配置自定义RabbitStreamTemplateObservationConvention
或RabbitStreamListenerObservationConvention
分别添加到模板或侦听器容器。
默认实现将name
标签用于模板观察,以及listener.id
标签。
您可以将子类化DefaultRabbitStreamTemplateObservationConvention
或DefaultStreamRabbitListenerObservationConvention
或提供全新的实现。
有关更多详细信息,请参阅千分尺观察文档。
4.3. 记录子系统 AMQP 附加器
该框架为一些流行的日志记录子系统提供了日志记录附加程序:
-
logback(从 Spring AMQP 1.4 版开始)
-
log4j2(从 Spring AMQP 版本 1.6 开始)
附加程序是使用日志记录子系统的正常机制配置的,可用属性在以下部分中指定。
4.3.1. 通用属性
以下属性可用于所有附加器:
属性 | 默认值 | 描述 |
---|---|---|
exchangeName |
logs |
要向其发布日志事件的交换的名称。 |
exchangeType |
topic |
要向其发布日志事件的交换类型 — 仅当附加者声明交换时才需要。
看 |
routingKeyPattern |
%c.%p |
用于生成路由密钥的日志记录子系统模式格式。 |
applicationId |
应用程序 ID — 如果模式包括 |
|
senderPoolSize |
2 |
用于发布日志事件的线程数。 |
maxSenderRetries |
30 |
如果代理不可用或存在其他错误,则重试发送消息的次数。
重试延迟如下: |
addresses |
以逗号分隔的代理地址列表,格式如下: |
|
host |
localhost |
RabbitMQ 主机 。 |
port |
5672 |
要连接的 RabbitMQ 端口。 |
virtualHost |
/ |
要连接的 RabbitMQ 虚拟主机。 |
username |
guest |
RabbitMQ 用户。 |
password |
guest |
此用户的 RabbitMQ 密码。 |
useSsl |
false |
是否对 RabbitMQ 连接使用 SSL。
看 |
verifyHostname |
true |
为 TLS 连接启用服务器主机名验证。
看 |
sslAlgorithm |
null |
要使用的 SSL 算法。 |
sslPropertiesLocation |
null |
SSL 属性文件的位置。 |
keyStore |
null |
密钥库的位置。 |
keyStorePassphrase |
null |
密钥库的密码。 |
keyStoreType |
JKS |
密钥库类型。 |
trustStore |
null |
信任存储的位置。 |
trustStorePassphrase |
null |
信任库的密码。 |
trustStoreType |
JKS |
信任库类型。 |
saslConfig |
null (RabbitMQ client default applies) |
这 |
contentType |
text/plain |
|
contentEncoding |
|
|
declareExchange |
false |
是否在此附加器启动时声明已配置的交换。
也可以看看 |
durable |
true |
什么时候 |
autoDelete |
false |
什么时候 |
charset |
null |
转换时要使用的字符集 |
deliveryMode |
PERSISTENT |
|
generateId |
false |
用于确定 |
clientConnectionProperties |
null |
以逗号分隔的列表 |
addMdcAsHeaders |
true |
在引入此属性之前,MDC 属性始终被添加到 RabbitMQ 消息头中。
这可能会导致大型 MDC 出现问题,因为 RabbitMQ 的所有标头的缓冲区大小有限,而且这个缓冲区非常小。
引入此属性是为了避免在大型 MDC 的情况下出现问题。
默认情况下,此值设置为 |
4.3.2. Log4j 2 附加器
以下示例显示了如何配置 Log4j 2 附加器:
<Appenders>
...
<RabbitMQ name="rabbitmq"
addresses="foo:5672,bar:5672" user="guest" password="guest" virtualHost="/"
exchange="log4j2" exchangeType="topic" declareExchange="true" durable="true" autoDelete="false"
applicationId="myAppId" routingKeyPattern="%X{applicationId}.%c.%p"
contentType="text/plain" contentEncoding="UTF-8" generateId="true" deliveryMode="NON_PERSISTENT"
charset="UTF-8"
senderPoolSize="3" maxSenderRetries="5"
addMdcAsHeaders="false">
</RabbitMQ>
</Appenders>
从版本 1.6.10 和 1.7.3 开始,默认情况下,log4j2 附加器将消息发布到调用线程上的 RabbitMQ。
这是因为默认情况下,Log4j 2 不会创建线程安全事件。
如果代理出现故障,则 |
4.3.3. Logback 附加器
以下示例显示了如何配置 logback 附加器:
<appender name="AMQP" class="org.springframework.amqp.rabbit.logback.AmqpAppender">
<layout>
<pattern><![CDATA[ %d %p %t [%c] - <%m>%n ]]></pattern>
</layout>
<addresses>foo:5672,bar:5672</addresses>
<abbreviation>36</abbreviation>
<includeCallerData>false</includeCallerData>
<applicationId>myApplication</applicationId>
<routingKeyPattern>%property{applicationId}.%c.%p</routingKeyPattern>
<generateId>true</generateId>
<charset>UTF-8</charset>
<durable>false</durable>
<deliveryMode>NON_PERSISTENT</deliveryMode>
<declareExchange>true</declareExchange>
<addMdcAsHeaders>false</addMdcAsHeaders>
</appender>
从 1.7.1 版本开始,LogbackAmqpAppender
提供includeCallerData
选项,即false
默认情况下。
提取调用方数据可能相当昂贵,因为日志事件必须创建一个可抛出项并检查它以确定调用位置。
因此,默认情况下,当事件添加到事件队列时,不会提取与事件关联的调用方数据。
您可以通过将includeCallerData
属性设置为true
.
从 2.0.0 版本开始,LogbackAmqpAppender
支持 Logback 编码器,其中包含encoder
选择。
这encoder
和layout
选项是相互排斥的。
4.3.4. 自定义消息
默认情况下,AMQP 附加器填充以下消息属性:
-
deliveryMode
-
内容类型
-
contentEncoding
,如果已配置 -
messageId
如果generateId
已配置 -
timestamp
日志事件的 -
appId
,如果配置了 applicationId
此外,它们还使用以下值填充标头:
-
categoryName
日志事件的 -
日志事件的级别
-
thread
:发生日志事件的线程的名称 -
日志事件调用的堆栈跟踪位置
-
所有 MDC 属性的副本(除非
addMdcAsHeaders
设置为false
)
每个附加器都可以进行子类化,以便您在发布之前修改消息。 以下示例演示如何自定义日志消息:
public class MyEnhancedAppender extends AmqpAppender {
@Override
public Message postProcessMessageBeforeSend(Message message, Event event) {
message.getMessageProperties().setHeader("foo", "bar");
return message;
}
}
从 2.2.4 开始,log4j2AmqpAppender
可以使用@PluginBuilderFactory
并扩展AmqpAppender.Builder
@Plugin(name = "MyEnhancedAppender", category = "Core", elementType = "appender", printObject = true)
public class MyEnhancedAppender extends AmqpAppender {
public MyEnhancedAppender(String name, Filter filter, Layout<? extends Serializable> layout,
boolean ignoreExceptions, AmqpManager manager, BlockingQueue<Event> eventQueue, String foo, String bar) {
super(name, filter, layout, ignoreExceptions, manager, eventQueue);
@Override
public Message postProcessMessageBeforeSend(Message message, Event event) {
message.getMessageProperties().setHeader("foo", "bar");
return message;
}
@PluginBuilderFactory
public static Builder newBuilder() {
return new Builder();
}
protected static class Builder extends AmqpAppender.Builder {
@Override
protected AmqpAppender buildInstance(String name, Filter filter, Layout<? extends Serializable> layout,
boolean ignoreExceptions, AmqpManager manager, BlockingQueue<Event> eventQueue) {
return new MyEnhancedAppender(name, filter, layout, ignoreExceptions, manager, eventQueue);
}
}
}
4.3.5. 自定义客户端属性
可以通过添加字符串属性或更复杂的属性来添加自定义客户端属性。
简单字符串属性
每个附加器都支持将客户端属性添加到 RabbitMQ 连接。
以下示例演示如何为日志返回添加自定义客户端属性:
<appender name="AMQP" ...>
...
<clientConnectionProperties>thing1:thing2,cat:hat</clientConnectionProperties>
...
</appender>
<Appenders>
...
<RabbitMQ name="rabbitmq"
...
clientConnectionProperties="thing1:thing2,cat:hat"
...
</RabbitMQ>
</Appenders>
这些属性是逗号分隔的列表key:value
对。 键和值不能包含逗号或冒号。
查看连接时,这些属性将显示在 RabbitMQ 管理 UI 上。
高级回登录技术
您可以对 Logback 附加器进行子类化。这样做可以在建立连接之前修改客户端连接属性。以下示例显示了如何执行此作:
public class MyEnhancedAppender extends AmqpAppender {
private String thing1;
@Override
protected void updateConnectionClientProperties(Map<String, Object> clientProperties) {
clientProperties.put("thing1", this.thing1);
}
public void setThing1(String thing1) {
this.thing1 = thing1;
}
}
然后你可以添加<thing1>thing2</thing1>
logback.xml。
对于 String 属性(如前面示例中所示的属性),可以使用前面的技术。
子类允许添加更丰富的属性(例如添加Map
或数字属性)。
4.3.6. 提供自定义队列实现
这AmqpAppenders
使用BlockingQueue
以异步将日志事件发布到 RabbitMQ。
默认情况下,一个LinkedBlockingQueue
被使用。
但是,您可以提供任何类型的定制BlockingQueue
实现。
以下示例显示了如何为 Logback 执行此作:
public class MyEnhancedAppender extends AmqpAppender {
@Override
protected BlockingQueue<Event> createEventQueue() {
return new ArrayBlockingQueue();
}
}
Log4j 2 附加器支持使用BlockingQueueFactory
,如以下示例所示:
<Appenders>
...
<RabbitMQ name="rabbitmq"
bufferSize="10" ... >
<ArrayBlockingQueue/>
</RabbitMQ>
</Appenders>
4.4. 示例应用程序
Spring AMQP 示例项目包括两个示例应用程序。 第一个是一个简单的“Hello World”示例,演示了同步和异步消息接收。 它为了解基本组件提供了一个很好的起点。 第二个示例基于股票交易用例,以演示实际应用程序中常见的交互类型。 在本章中,我们提供了每个示例的快速演练,以便您可以专注于最重要的组件。 这些示例都是基于 Maven 的,因此您应该能够将它们直接导入到任何 Maven 感知 IDE(例如 SpringSource Tool Suite)中。
4.4.1. “Hello World”示例
“Hello World”示例演示了同步和异步消息接收。
您可以导入spring-rabbit-helloworld
sample 到 IDE 中,然后按照下面的讨论进行作。
同步示例
在src/main/java
目录中,导航到org.springframework.amqp.helloworld
包。
打开HelloWorldConfiguration
类,并注意它包含@Configuration
类级别的注释,并注意到一些@Bean
方法级别的注释。
这是 Spring 基于 Java 的配置示例。
你可以在这里阅读更多相关信息。
以下列表显示了如何创建连接工厂:
@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
该配置还包含RabbitAdmin
,默认情况下,它会查找任何类型为 exchange、queue 或 binding 的 bean,然后在代理上声明它们。
事实上,helloWorldQueue
在HelloWorldConfiguration
是一个示例,因为它是Queue
.
以下列表显示了helloWorldQueue
豆子定义:
@Bean
public Queue helloWorldQueue() {
return new Queue(this.helloWorldQueueName);
}
回顾rabbitTemplate
bean 配置,可以看到它的名称为helloWorldQueue
设置为其queue
属性(用于接收消息)和其routingKey
属性(用于发送消息)。
现在我们已经探索了配置,我们可以看看实际使用这些组件的代码。
首先,打开Producer
类。
它包含一个main()
方法,其中 SpringApplicationContext
被创建。
以下列表显示了main
方法:
public static void main(String[] args) {
ApplicationContext context =
new AnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
amqpTemplate.convertAndSend("Hello World");
System.out.println("Sent: Hello World");
}
在前面的示例中,AmqpTemplate
bean 被检索并用于发送Message
.
由于客户端代码应尽可能依赖接口,因此类型为AmqpTemplate
而不是RabbitTemplate
.
即使 bean 在HelloWorldConfiguration
是RabbitTemplate
,依赖接口意味着这段代码更可移植(你可以独立于代码来更改配置)。
由于convertAndSend()
方法被调用时,模板委托给其MessageConverter
实例。
在这种情况下,它使用默认的SimpleMessageConverter
,但可以向rabbitTemplate
bean,如HelloWorldConfiguration
.
现在打开Consumer
类。
它实际上共享相同的配置基类,这意味着它共享rabbitTemplate
豆。
这就是为什么我们在该模板中配置了routingKey
(用于发送)和queue
(用于接收)。
正如我们在AmqpTemplate
,您可以改为将 'routingKey' 参数传递给 send 方法,将 'queue' 参数传递给 receive 方法。
这Consumer
代码基本上是 Producer 的镜像,调用receiveAndConvert()
而不是convertAndSend()
.
以下列表显示了Consumer
:
public static void main(String[] args) {
ApplicationContext context =
new AnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
System.out.println("Received: " + amqpTemplate.receiveAndConvert());
}
如果您运行Producer
然后运行Consumer
,您应该看到Received: Hello World
在控制台输出中。
异步示例
同步示例演练了同步 Hello World 示例。
本节介绍一个稍微高级但功能更强大的选项。
通过一些修改,Hello World 示例可以提供异步接收的示例,也称为消息驱动的 POJO。
事实上,有一个子包提供了这一点:org.springframework.amqp.samples.helloworld.async
.
同样,我们从发送方开始。
打开ProducerConfiguration
类,并注意它创建了一个connectionFactory
和rabbitTemplate
豆。
这一次,由于配置专用于消息发送端,我们甚至不需要任何队列定义,并且RabbitTemplate
仅设置了 'routingKey' 属性。
回想一下,消息是发送到交换,而不是直接发送到队列。
AMQP 默认交换是没有名称的直接交换。
所有队列都绑定到该默认交换,其名称作为路由密钥。
这就是为什么我们只需要在这里提供路由密钥。
以下列表显示了rabbitTemplate
定义:
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setRoutingKey(this.helloWorldQueueName);
return template;
}
由于此示例演示了异步消息接收,因此生产端被设计为连续发送消息(如果它是像同步版本一样的消息按执行模型,则不会很明显,它实际上是一个消息驱动的消费者)。
负责连续发送消息的组件被定义为ProducerConfiguration
.
它配置为每三秒运行一次。
以下列表显示了组件:
static class ScheduledProducer {
@Autowired
private volatile RabbitTemplate rabbitTemplate;
private final AtomicInteger counter = new AtomicInteger();
@Scheduled(fixedRate = 3000)
public void sendMessage() {
rabbitTemplate.convertAndSend("Hello World " + counter.incrementAndGet());
}
}
您不需要了解所有细节,因为真正的重点应该放在接收方(我们将在接下来介绍)。
但是,如果您还不熟悉 Spring 任务调度支持,您可以在此处了解更多信息。
简而言之,该postProcessor
bean 中的ProducerConfiguration
向调度程序注册任务。
现在我们可以转向接收方。
为了强调消息驱动的 POJO 行为,我们从对消息做出反应的组件开始。
该类称为HelloWorldHandler
,如以下清单所示:
public class HelloWorldHandler {
public void handleMessage(String text) {
System.out.println("Received: " + text);
}
}
该类是 POJO。
它不扩展任何基类,不实现任何接口,甚至不包含任何导入。
它正在“适应”以适应MessageListener
Spring AMQP 的接口MessageListenerAdapter
.
然后,您可以在SimpleMessageListenerContainer
.
对于此示例,容器是在ConsumerConfiguration
类。
您可以在那里看到包裹在适配器中的 POJO。
以下列表显示了如何listenerContainer
定义为:
@Bean
public SimpleMessageListenerContainer listenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueueName(this.helloWorldQueueName);
container.setMessageListener(new MessageListenerAdapter(new HelloWorldHandler()));
return container;
}
这SimpleMessageListenerContainer
是一个 Spring 生命周期组件,默认情况下会自动启动。
如果您查看Consumer
类,你可以看到它的main()
方法只包含一个单行引导程序,用于创建ApplicationContext
.
制作人的main()
方法也是一个单行引导程序,因为其方法被@Scheduled
也会自动启动。
您可以启动Producer
和Consumer
以任何顺序,您应该会看到每三秒发送一次和接收消息。
4.4.2. 股票交易
股票交易示例演示了比 Hello World 示例更高级的消息传递方案。
但是,配置非常相似,只是更复杂一些。
由于我们详细介绍了 Hello World 配置,因此在这里,我们将重点介绍此示例的不同之处。
有一个服务器将市场数据(股票报价)推送到主题交易所。
然后,客户端可以通过将队列与路由模式(例如,app.stock.quotes.nasdaq.*
).
该演示的另一个主要功能是由客户端发起并由服务器处理的请求-回复“股票交易”交互。
这涉及一个私人replyTo
客户端在订单请求消息本身中发送的队列。
服务器的核心配置位于RabbitServerConfiguration
类中的org.springframework.amqp.rabbit.stocks.config.server
包。
它扩展了AbstractStockAppRabbitConfiguration
.
这是定义服务器和客户端通用资源的地方,包括市场数据主题交换(其名称为“app.stock.marketdata”)和服务器为股票交易公开的队列(其名称为“app.stock.request”)。
在该通用配置文件中,您还会看到Jackson2JsonMessageConverter
在RabbitTemplate
.
特定于服务器的配置由两件事组成。
首先,它在RabbitTemplate
这样它就不需要在每次调用时提供该交换名称来发送Message
.
它在基本配置类中定义的抽象回调方法中执行此作。
以下列表显示了该方法:
public void configureRabbitTemplate(RabbitTemplate rabbitTemplate) {
rabbitTemplate.setExchange(MARKET_DATA_EXCHANGE_NAME);
}
其次,声明股票请求队列。
在这种情况下,它不需要任何显式绑定,因为它绑定到默认的无名交换,并以自己的名称作为路由键。
如前所述,AMQP 规范定义了该行为。
以下列表显示了stockRequestQueue
豆:
@Bean
public Queue stockRequestQueue() {
return new Queue(STOCK_REQUEST_QUEUE_NAME);
}
现在您已经看到了服务器 AMQP 资源的配置,请导航到org.springframework.amqp.rabbit.stocks
package 下的src/test/java
目录。
在那里,你可以看到实际的Server
提供main()
方法。
它创建了一个ApplicationContext
基于server-bootstrap.xml
配置文件。
在那里,您可以看到发布虚拟市场数据的计划任务。
该配置依赖于 Spring 的task
命名空间支持。
引导配置文件还导入一些其他文件。
最有意思的就是server-messaging.xml
,位于src/main/resources
.
在那里,你可以看到messageListenerContainer
负责处理股票交易请求的 bean。
最后,看看serverHandler
在server-handlers.xml
(这也在 'src/main/resources' 中)。
该 bean 是ServerHandler
类,并且是消息驱动的 POJO 的一个很好的例子,它也可以发送回复消息。
请注意,它本身未与框架或任何 AMQP 概念耦合。
它接受一个TradeRequest
并返回一个TradeResponse
.
以下列表显示了handleMessage
方法:
public TradeResponse handleMessage(TradeRequest tradeRequest) { ...
}
现在我们已经看到了服务器最重要的配置和代码,我们可以转向客户端。
最好的起点大概是RabbitClientConfiguration
在org.springframework.amqp.rabbit.stocks.config.client
包。
请注意,它声明了两个队列,但没有提供显式名称。
以下列表显示了两个队列的 Bean 定义:
@Bean
public Queue marketDataQueue() {
return amqpAdmin().declareQueue();
}
@Bean
public Queue traderJoeQueue() {
return amqpAdmin().declareQueue();
}
这些是专用队列,并且会自动生成唯一名称。
客户端使用第一个生成的队列绑定到服务器已公开的市场数据交换。
回想一下,在 AMQP 中,使用者与队列交互,而生产者与交换交互。
队列与交换的“绑定”是告诉代理将消息从给定交换传递(或路由)到队列的内容。
由于市场数据交换是主题交换,因此绑定可以用路由模式来表达。
这RabbitClientConfiguration
使用Binding
对象,并且该对象是使用BindingBuilder
流畅的 API。
以下列表显示了Binding
:
@Value("${stocks.quote.pattern}")
private String marketDataRoutingKey;
@Bean
public Binding marketDataBinding() {
return BindingBuilder.bind(
marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
}
请注意,实际值已在属性文件 (client.properties
下src/main/resources
),并且我们使用 Spring 的@Value
注释来注入该值。
这通常是一个好主意。
否则,该值将被硬编码在类中,并且无需重新编译即可无法修改。
在这种情况下,在更改用于绑定的路由模式时运行多个版本的客户端要容易得多。
我们现在可以尝试一下。
首先运行org.springframework.amqp.rabbit.stocks.Server
然后org.springframework.amqp.rabbit.stocks.Client
.
您应该看到虚拟引文NASDAQ
stocks,因为与 client.properties 中的“stocks.quote.pattern”键关联的当前值是“app.stock.quotes.nasdaq。'.
现在,在保留现有Server
和Client
运行时,将该属性值更改为“app.stock.quotes.nyse.' 并开始第二个Client
实例。
您应该看到第一个客户仍然收到纳斯达克报价,而第二个客户收到纽约证券交易所报价。
相反,您可以更改模式以获取所有股票甚至单个股票代码。
我们探索的最后一个功能是从客户角度出发的请求-回复交互。
回想一下,我们已经看到了ServerHandler
接受TradeRequest
对象和返回TradeResponse
对象。
上相应的代码Client
side 是RabbitStockServiceGateway
在org.springframework.amqp.rabbit.stocks.gateway
包。
它委托给RabbitTemplate
为了发送消息。
以下列表显示了send
方法:
public void send(TradeRequest tradeRequest) {
getRabbitTemplate().convertAndSend(tradeRequest, new MessagePostProcessor() {
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setReplyTo(new Address(defaultReplyToQueue));
try {
message.getMessageProperties().setCorrelationId(
UUID.randomUUID().toString().getBytes("UTF-8"));
}
catch (UnsupportedEncodingException e) {
throw new AmqpException(e);
}
return message;
}
});
}
请注意,在发送消息之前,它会将replyTo
地址。
它提供了由traderJoeQueue
bean 定义(前面显示)。以下列表显示了@Bean
定义StockServiceGateway
类本身:
@Bean
public StockServiceGateway stockServiceGateway() {
RabbitStockServiceGateway gateway = new RabbitStockServiceGateway();
gateway.setRabbitTemplate(rabbitTemplate());
gateway.setDefaultReplyToQueue(traderJoeQueue());
return gateway;
}
如果您不再运行服务器和客户端,请立即启动它们。 尝试发送格式为“100 TCKR”的请求。 在模拟请求“处理”的短暂人为延迟之后,您应该会在客户端上看到一条确认消息。
4.5. 测试支持
为异步应用程序编写集成必然比测试更简单的应用程序更复杂。当抽象(例如@RabbitListener
注释出现在画面中。问题是如何验证在发送消息后,侦听器是否按预期收到消息。
框架本身有许多单元测试和集成测试。有些使用模拟,而另一些则使用实时 RabbitMQ 代理的集成测试。您可以查阅这些测试以获取测试场景的一些想法。
Spring AMQP 1.6 版引入了spring-rabbit-test
jar,它为测试其中一些更复杂的场景提供支持。预计该项目将随着时间的推移而扩展,但我们需要社区反馈来为帮助测试所需的功能提出建议。请使用 JIRA 或 GitHub Issues 提供此类反馈。
4.5.1. @SpringRabbitTest
使用此注释将基础架构 Bean 添加到 Spring 测试中ApplicationContext
.
例如,在使用@SpringBootTest
因为 Spring Boot 的自动配置将添加 bean。
注册的 Bean 是:
-
CachingConnectionFactory
(autoConnectionFactory
).如果@RabbitEnabled
存在,则使用其连接工厂。 -
RabbitTemplate
(autoRabbitTemplate
) -
RabbitAdmin
(autoRabbitAdmin
) -
RabbitListenerContainerFactory
(autoContainerFactory
)
此外,与@EnableRabbit
(支持@RabbitListener
) 被添加。
@SpringJunitConfig
@SpringRabbitTest
public class MyRabbitTests {
@Autowired
private RabbitTemplate template;
@Autowired
private RabbitAdmin admin;
@Autowired
private RabbitListenerEndpointRegistry registry;
@Test
void test() {
...
}
@Configuration
public static class Config {
...
}
}
使用 JUnit4,将@SpringJunitConfig
跟@RunWith(SpringRunnner.class)
.
4.5.2. 莫基托Answer<?>
实现
目前有两个Answer<?>
实现以帮助进行测试。
第一个,LatchCountDownAndCallRealMethodAnswer
,提供Answer<Void>
返回null
并倒计时闩锁。
以下示例演示如何使用LatchCountDownAndCallRealMethodAnswer
:
LatchCountDownAndCallRealMethodAnswer answer = this.harness.getLatchAnswerFor("myListener", 2);
doAnswer(answer)
.when(listener).foo(anyString(), anyString());
...
assertThat(answer.await(10)).isTrue();
第二个,LambdaAnswer<T>
提供了一种可以选择调用 real 方法的机制,并提供了一个机会返回自定义结果,基于InvocationOnMock
和结果(如果有)。
考虑以下 POJO:
public class Thing {
public String thing(String thing) {
return thing.toUpperCase();
}
}
以下类测试Thing
POJO:
Thing thing = spy(new Thing());
doAnswer(new LambdaAnswer<String>(true, (i, r) -> r + r))
.when(thing).thing(anyString());
assertEquals("THINGTHING", thing.thing("thing"));
doAnswer(new LambdaAnswer<String>(true, (i, r) -> r + i.getArguments()[0]))
.when(thing).thing(anyString());
assertEquals("THINGthing", thing.thing("thing"));
doAnswer(new LambdaAnswer<String>(false, (i, r) ->
"" + i.getArguments()[0] + i.getArguments()[0])).when(thing).thing(anyString());
assertEquals("thingthing", thing.thing("thing"));
从版本 2.2.3 开始,答案会捕获被测方法引发的任何异常。 用answer.getExceptions()
以获取对它们的引用。
当与@RabbitListenerTest
和RabbitListenerTestHarness
用harness.getLambdaAnswerFor("listenerId", true, …)
为听众获得正确构建的答案。
4.5.3.@RabbitListenerTest
和RabbitListenerTestHarness
注释您的一个@Configuration
类与@RabbitListenerTest
导致框架将 标准RabbitListenerAnnotationBeanPostProcessor
使用名为RabbitListenerTestHarness
(它还使@RabbitListener
通过@EnableRabbit
).
这RabbitListenerTestHarness
通过两种方式增强侦听器。首先,它将侦听器包装在Mockito Spy
,启用正常Mockito
存根和验证作。它还可以添加一个Advice
到侦听器,允许访问参数、结果和抛出的任何异常。您可以使用@RabbitListenerTest
. 后者用于访问有关调用的较低级别数据。它还支持阻止测试线程,直到调用异步侦听器。
final @RabbitListener 方法不能被监视或建议。
此外,只有具有id 属性可以被监视或建议。 |
考虑一些例子。
以下示例使用 spy:
@Configuration
@RabbitListenerTest
public class Config {
@Bean
public Listener listener() {
return new Listener();
}
...
}
public class Listener {
@RabbitListener(id="foo", queues="#{queue1.name}")
public String foo(String foo) {
return foo.toUpperCase();
}
@RabbitListener(id="bar", queues="#{queue2.name}")
public void foo(@Payload String foo, @Header("amqp_receivedRoutingKey") String rk) {
...
}
}
public class MyTests {
@Autowired
private RabbitListenerTestHarness harness; (1)
@Test
public void testTwoWay() throws Exception {
assertEquals("FOO", this.rabbitTemplate.convertSendAndReceive(this.queue1.getName(), "foo"));
Listener listener = this.harness.getSpy("foo"); (2)
assertNotNull(listener);
verify(listener).foo("foo");
}
@Test
public void testOneWay() throws Exception {
Listener listener = this.harness.getSpy("bar");
assertNotNull(listener);
LatchCountDownAndCallRealMethodAnswer answer = this.harness.getLatchAnswerFor("bar", 2); (3)
doAnswer(answer).when(listener).foo(anyString(), anyString()); (4)
this.rabbitTemplate.convertAndSend(this.queue2.getName(), "bar");
this.rabbitTemplate.convertAndSend(this.queue2.getName(), "baz");
assertTrue(answer.await(10));
verify(listener).foo("bar", this.queue2.getName());
verify(listener).foo("baz", this.queue2.getName());
}
}
1 | 将工具注入测试用例中,以便我们可以访问间谍。 |
2 | 获取对间谍的引用,以便我们可以验证它是按预期调用的。
由于这是一个发送和接收作,因此无需挂起测试线程,因为它已经
在RabbitTemplate 等待回复。 |
3 | 在这种情况下,我们只使用发送作,因此我们需要一个闩锁来等待对侦听器的异步调用
在容器线程上。
我们使用 Answer<?> 实现之一来帮助解决这个问题。
重要提示:由于侦听者被监视的方式,使用harness.getLatchAnswerFor() 为间谍获取正确配置的答案。 |
4 | 配置 spy 以调用Answer . |
以下示例使用捕获建议:
@Configuration
@ComponentScan
@RabbitListenerTest(spy = false, capture = true)
public class Config {
}
@Service
public class Listener {
private boolean failed;
@RabbitListener(id="foo", queues="#{queue1.name}")
public String foo(String foo) {
return foo.toUpperCase();
}
@RabbitListener(id="bar", queues="#{queue2.name}")
public void foo(@Payload String foo, @Header("amqp_receivedRoutingKey") String rk) {
if (!failed && foo.equals("ex")) {
failed = true;
throw new RuntimeException(foo);
}
failed = false;
}
}
public class MyTests {
@Autowired
private RabbitListenerTestHarness harness; (1)
@Test
public void testTwoWay() throws Exception {
assertEquals("FOO", this.rabbitTemplate.convertSendAndReceive(this.queue1.getName(), "foo"));
InvocationData invocationData =
this.harness.getNextInvocationDataFor("foo", 0, TimeUnit.SECONDS); (2)
assertThat(invocationData.getArguments()[0], equalTo("foo")); (3)
assertThat((String) invocationData.getResult(), equalTo("FOO"));
}
@Test
public void testOneWay() throws Exception {
this.rabbitTemplate.convertAndSend(this.queue2.getName(), "bar");
this.rabbitTemplate.convertAndSend(this.queue2.getName(), "baz");
this.rabbitTemplate.convertAndSend(this.queue2.getName(), "ex");
InvocationData invocationData =
this.harness.getNextInvocationDataFor("bar", 10, TimeUnit.SECONDS); (4)
Object[] args = invocationData.getArguments();
assertThat((String) args[0], equalTo("bar"));
assertThat((String) args[1], equalTo(queue2.getName()));
invocationData = this.harness.getNextInvocationDataFor("bar", 10, TimeUnit.SECONDS);
args = invocationData.getArguments();
assertThat((String) args[0], equalTo("baz"));
invocationData = this.harness.getNextInvocationDataFor("bar", 10, TimeUnit.SECONDS);
args = invocationData.getArguments();
assertThat((String) args[0], equalTo("ex"));
assertEquals("ex", invocationData.getThrowable().getMessage()); (5)
}
}
1 | 将工具注入测试用例中,以便我们可以访问间谍。 |
2 | 用harness.getNextInvocationDataFor() 检索调用数据 - 在本例中,因为它是请求/回复
无需等待任何时间,因为测试线程已挂起RabbitTemplate 等待
为了结果。 |
3 | 然后我们可以验证参数和结果是否符合预期。 |
4 | 这一次我们需要一些时间来等待数据,因为它是容器线程上的异步作,我们需要 以挂起测试线程。 |
5 | 当侦听器抛出异常时,它在throwable 调用数据的属性。 |
使用自定义时Answer<?> s 与线束,为了正常运行,此类答案应 subclassForwardsInvocation 并从线束 (getDelegate("myListener") ) 并调用super.answer(invocation) .
请参阅提供的莫基托Answer<?> 实现示例的源代码。 |
4.5.4. 使用TestRabbitTemplate
这TestRabbitTemplate
用于执行一些基本的集成测试,而无需代理。
当您将其添加为@Bean
在您的测试用例中,它会发现上下文中的所有侦听器容器,无论是否声明为@Bean
或<bean/>
或使用@RabbitListener
注解。
目前仅支持按队列名称路由。
该模板从容器中提取消息侦听器,并直接在测试线程上调用它。
请求-回复消息传递 (sendAndReceive
methods) 支持返回回复的侦听器。
以下测试用例使用模板:
@RunWith(SpringRunner.class)
public class TestRabbitTemplateTests {
@Autowired
private TestRabbitTemplate template;
@Autowired
private Config config;
@Test
public void testSimpleSends() {
this.template.convertAndSend("foo", "hello1");
assertThat(this.config.fooIn, equalTo("foo:hello1"));
this.template.convertAndSend("bar", "hello2");
assertThat(this.config.barIn, equalTo("bar:hello2"));
assertThat(this.config.smlc1In, equalTo("smlc1:"));
this.template.convertAndSend("foo", "hello3");
assertThat(this.config.fooIn, equalTo("foo:hello1"));
this.template.convertAndSend("bar", "hello4");
assertThat(this.config.barIn, equalTo("bar:hello2"));
assertThat(this.config.smlc1In, equalTo("smlc1:hello3hello4"));
this.template.setBroadcast(true);
this.template.convertAndSend("foo", "hello5");
assertThat(this.config.fooIn, equalTo("foo:hello1foo:hello5"));
this.template.convertAndSend("bar", "hello6");
assertThat(this.config.barIn, equalTo("bar:hello2bar:hello6"));
assertThat(this.config.smlc1In, equalTo("smlc1:hello3hello4hello5hello6"));
}
@Test
public void testSendAndReceive() {
assertThat(this.template.convertSendAndReceive("baz", "hello"), equalTo("baz:hello"));
}
@Configuration
@EnableRabbit
public static class Config {
public String fooIn = "";
public String barIn = "";
public String smlc1In = "smlc1:";
@Bean
public TestRabbitTemplate template() throws IOException {
return new TestRabbitTemplate(connectionFactory());
}
@Bean
public ConnectionFactory connectionFactory() throws IOException {
ConnectionFactory factory = mock(ConnectionFactory.class);
Connection connection = mock(Connection.class);
Channel channel = mock(Channel.class);
willReturn(connection).given(factory).createConnection();
willReturn(channel).given(connection).createChannel(anyBoolean());
given(channel.isOpen()).willReturn(true);
return factory;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() throws IOException {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
return factory;
}
@RabbitListener(queues = "foo")
public void foo(String in) {
this.fooIn += "foo:" + in;
}
@RabbitListener(queues = "bar")
public void bar(String in) {
this.barIn += "bar:" + in;
}
@RabbitListener(queues = "baz")
public String baz(String in) {
return "baz:" + in;
}
@Bean
public SimpleMessageListenerContainer smlc1() throws IOException {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
container.setQueueNames("foo", "bar");
container.setMessageListener(new MessageListenerAdapter(new Object() {
public void handleMessage(String in) {
smlc1In += in;
}
}));
return container;
}
}
}
4.5.5. JUnit4@Rules
Spring AMQP 版本 1.7 及更高版本提供了一个名为spring-rabbit-junit
.
这个罐子包含几个实用程序@Rule
运行 JUnit4 测试时使用的实例。
请参阅 JUnit5 测试的 JUnit5 条件。
用BrokerRunning
BrokerRunning
提供了一种机制,让测试在代理未运行时成功(在localhost
,默认情况下)。
它还具有用于初始化和清空队列以及删除队列和交换的实用方法。
以下示例显示了它的用法:
@ClassRule
public static BrokerRunning brokerRunning = BrokerRunning.isRunningWithEmptyQueues("foo", "bar");
@AfterClass
public static void tearDown() {
brokerRunning.removeTestQueues("some.other.queue.too") // removes foo, bar as well
}
有几个isRunning…
static 方法,例如isBrokerAndManagementRunning()
,用于验证代理是否启用了管理插件。
配置规则
有时,如果没有代理,则希望测试失败,例如夜间 CI 构建。
要在运行时禁用该规则,请设置一个名为RABBITMQ_SERVER_REQUIRED
自true
.
您可以使用 setter 或环境变量覆盖代理属性,例如主机名:
以下示例演示如何使用 setter 覆盖属性:
@ClassRule
public static BrokerRunning brokerRunning = BrokerRunning.isRunningWithEmptyQueues("foo", "bar");
static {
brokerRunning.setHostName("10.0.0.1")
}
@AfterClass
public static void tearDown() {
brokerRunning.removeTestQueues("some.other.queue.too") // removes foo, bar as well
}
您还可以通过设置以下环境变量来覆盖属性:
public static final String BROKER_ADMIN_URI = "RABBITMQ_TEST_ADMIN_URI";
public static final String BROKER_HOSTNAME = "RABBITMQ_TEST_HOSTNAME";
public static final String BROKER_PORT = "RABBITMQ_TEST_PORT";
public static final String BROKER_USER = "RABBITMQ_TEST_USER";
public static final String BROKER_PW = "RABBITMQ_TEST_PASSWORD";
public static final String BROKER_ADMIN_USER = "RABBITMQ_TEST_ADMIN_USER";
public static final String BROKER_ADMIN_PW = "RABBITMQ_TEST_ADMIN_PASSWORD";
这些环境变量会覆盖默认设置 (localhost:5672
对于 AMQP 和localhost:15672/api/
用于管理 REST API)。
更改主机名会影响amqp
和management
REST API 连接(除非显式设置了管理 uri)。
BrokerRunning
还提供了一个static
调用的方法setEnvironmentVariableOverrides
这使您可以传入包含这些变量的映射。
它们覆盖系统环境变量。
如果您希望对多个测试套件中的测试使用不同的配置,这可能很有用。
重要提示:在调用任何isRunning()
创建规则实例的静态方法。
变量值将应用于在此调用之后创建的所有实例。
调用clearEnvironmentVariableOverrides()
重置规则以使用默认值(包括任何实际环境变量)。
在测试用例中,您可以使用brokerRunning
创建连接工厂时;getConnectionFactory()
返回规则的 RabbitMQConnectionFactory
.
以下示例显示了如何执行此作:
@Bean
public CachingConnectionFactory rabbitConnectionFactory() {
return new CachingConnectionFactory(brokerRunning.getConnectionFactory());
}
4.5.6. JUnit5 条件
版本 2.0.2 引入了对 JUnit5 的支持。
使用@RabbitAvailable
注解
此类级注解类似于BrokerRunning
@Rule
讨论于JUnit4@Rules
.
它由RabbitAvailableCondition
.
注释具有三个属性:
-
queues
:在每次测试之前声明(和清除)并在所有测试完成后删除的队列数组。 -
management
:将此设置为true
如果您的测试还需要在代理上安装管理插件。 -
purgeAfterEach
:(从 2.2 版开始)当true
(默认)、queues
将在测试之间清除。
它用于检查代理是否可用,如果没有,则跳过测试。
如配置规则中所述,名为RABBITMQ_SERVER_REQUIRED
如果true
,如果没有代理,则会导致测试快速失败。
您可以使用环境变量来配置条件,如配置规则中所述。
此外,RabbitAvailableCondition
支持参数化测试构造函数和方法的参数解析。
支持两种参数类型:
-
BrokerRunningSupport
:实例(在 2.2 之前,这是一个 JUnit 4BrokerRunning
实例) -
ConnectionFactory
:这BrokerRunningSupport
实例的 RabbitMQ 连接工厂
以下示例显示了两者:
@RabbitAvailable(queues = "rabbitAvailableTests.queue")
public class RabbitAvailableCTORInjectionTests {
private final ConnectionFactory connectionFactory;
public RabbitAvailableCTORInjectionTests(BrokerRunningSupport brokerRunning) {
this.connectionFactory = brokerRunning.getConnectionFactory();
}
@Test
public void test(ConnectionFactory cf) throws Exception {
assertSame(cf, this.connectionFactory);
Connection conn = this.connectionFactory.newConnection();
Channel channel = conn.createChannel();
DeclareOk declareOk = channel.queueDeclarePassive("rabbitAvailableTests.queue");
assertEquals(0, declareOk.getConsumerCount());
channel.close();
conn.close();
}
}
前面的测试在框架本身中,验证参数注入以及条件是否正确创建了队列。
实际用户测试可能如下:
@RabbitAvailable(queues = "rabbitAvailableTests.queue")
public class RabbitAvailableCTORInjectionTests {
private final CachingConnectionFactory connectionFactory;
public RabbitAvailableCTORInjectionTests(BrokerRunningSupport brokerRunning) {
this.connectionFactory =
new CachingConnectionFactory(brokerRunning.getConnectionFactory());
}
@Test
public void test() throws Exception {
RabbitTemplate template = new RabbitTemplate(this.connectionFactory);
...
}
}
当您在测试类中使用 Spring 注释应用程序上下文时,您可以通过名为RabbitAvailableCondition.getBrokerRunning()
.
从 2.2 版本开始,getBrokerRunning() 返回一个BrokerRunningSupport 对象;以前,JUnit 4BrokerRunnning 实例被返回。
新类具有与BrokerRunning . |
以下测试来自框架并演示了用法:
@RabbitAvailable(queues = {
RabbitTemplateMPPIntegrationTests.QUEUE,
RabbitTemplateMPPIntegrationTests.REPLIES })
@SpringJUnitConfig
@DirtiesContext(classMode = ClassMode.AFTER_EACH_TEST_METHOD)
public class RabbitTemplateMPPIntegrationTests {
public static final String QUEUE = "mpp.tests";
public static final String REPLIES = "mpp.tests.replies";
@Autowired
private RabbitTemplate template;
@Autowired
private Config config;
@Test
public void test() {
...
}
@Configuration
@EnableRabbit
public static class Config {
@Bean
public CachingConnectionFactory cf() {
return new CachingConnectionFactory(RabbitAvailableCondition
.getBrokerRunning()
.getConnectionFactory());
}
@Bean
public RabbitTemplate template() {
...
}
@Bean
public SimpleRabbitListenerContainerFactory
rabbitListenerContainerFactory() {
...
}
@RabbitListener(queues = QUEUE)
public byte[] foo(byte[] in) {
return in;
}
}
}
使用@LongRunning
注解
类似于LongRunningIntegrationTest
JUnit4@Rule
,除非将环境变量(或系统属性)设置为true
.
以下示例演示如何使用它:
@RabbitAvailable(queues = SimpleMessageListenerContainerLongTests.QUEUE)
@LongRunning
public class SimpleMessageListenerContainerLongTests {
public static final String QUEUE = "SimpleMessageListenerContainerLongTests.queue";
...
}
默认情况下,变量为RUN_LONG_INTEGRATION_TESTS
,但您可以在注释的value
属性。