对于最新的稳定版本,请使用 Spring AMQP 3.2.0! |
连接和资源管理
虽然我们在上一节中描述的 AMQP 模型是通用的,适用于所有实现,但当我们进入资源管理时,细节是特定于代理实现的。 因此,在本节中,我们重点介绍仅存在于 “spring-rabbit” 模块中的代码,因为此时 RabbitMQ 是唯一受支持的实现。
用于管理与 RabbitMQ 代理的连接的中心组件是接口。
实现的责任是提供 的实例,该实例是 的包装器 。ConnectionFactory
ConnectionFactory
org.springframework.amqp.rabbit.connection.Connection
com.rabbitmq.client.Connection
选择连接工厂
有三种连接工厂可供选择
-
PooledChannelConnectionFactory
-
ThreadChannelConnectionFactory
-
CachingConnectionFactory
前两个是在 2.3 版本中添加的。
对于大多数用例,应该使用 the 。
如果要确保严格的消息排序,而无需使用 Scoped Operations,则可以使用 。
这与 for similar that that it use a single connection 和一个 pool of channels。
它的实现更简单,但它不支持相关的发布者确认。CachingConnectionFactory
ThreadChannelConnectionFactory
PooledChannelConnectionFactory
CachingConnectionFactory
这三个工厂都支持简单的发布者确认。
将 配置为使用单独的连接时,您现在可以从版本 2.3.2 开始,将发布连接工厂配置为其他类型。
默认情况下,发布工厂的类型相同,并且在主工厂上设置的任何属性也会传播到发布工厂。RabbitTemplate
从版本 3.1 开始,包括该属性,该属性支持连接模块中的退避策略。
目前,支持处理达到限制时发生的异常的行为,实施基于尝试和间隔的回退策略。AbstractConnectionFactory
connectionCreatingBackOff
createChannel()
channelMax
PooledChannelConnectionFactory
此工厂基于 Apache Pool2 管理单个连接和两个通道池。
一个池用于事务通道,另一个池用于非事务通道。
池是 s 的默认配置;提供回调以配置池;有关更多信息,请参阅 Apache 文档。GenericObjectPool
Apache jar 必须位于类路径上才能使用此工厂。commons-pool2
@Bean
PooledChannelConnectionFactory pcf() throws Exception {
ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
rabbitConnectionFactory.setHost("localhost");
PooledChannelConnectionFactory pcf = new PooledChannelConnectionFactory(rabbitConnectionFactory);
pcf.setPoolConfigurer((pool, tx) -> {
if (tx) {
// configure the transactional pool
}
else {
// configure the non-transactional pool
}
});
return pcf;
}
ThreadChannelConnectionFactory
此工厂管理单个连接和两个 s,一个用于事务通道,另一个用于非事务通道。
此工厂确保同一线程上的所有操作都使用相同的通道(只要它保持打开状态)。
这有助于对消息进行严格的排序,而无需 Scoped Operations。
为避免内存泄漏,如果您的应用程序使用许多短期线程,则必须调用工厂的线程以释放通道资源。
从版本 2.3.7 开始,线程可以将其通道传输给另一个线程。
有关更多信息,请参见多线程环境中的严格消息排序。ThreadLocal
closeThreadChannel()
CachingConnectionFactory
提供的第三个实现是 ,默认情况下,它建立了一个可由应用程序共享的单个连接代理。
共享连接是可能的,因为使用 AMQP 进行消息传递的“工作单元”实际上是一个“通道”(在某些方面,这类似于 JMS 中连接和会话之间的关系)。
connection 实例提供了一个方法。
该实现支持这些通道的缓存,并且它根据通道是否为事务性通道维护单独的缓存。
创建 的实例时,可以通过构造函数提供 'hostname'。
您还应该提供 'username' 和 'password' 属性。
要配置通道缓存的大小(默认值为 25),您可以调用该方法。CachingConnectionFactory
createChannel
CachingConnectionFactory
CachingConnectionFactory
setChannelCacheSize()
从版本 1.3 开始,您可以配置 to cache connections 以及仅 channels。
在这种情况下,每次调用 都会创建一个新连接(或从缓存中检索一个空闲连接)。
关闭连接会将其返回到缓存中(如果尚未达到缓存大小)。
在此类连接上创建的通道也会被缓存。
在某些环境中,例如从 HA 集群使用,使用单独的连接可能很有用。
与负载均衡器结合使用,以连接到不同的集群成员等。
要缓存连接,请将 设置为 .CachingConnectionFactory
createConnection()
cacheMode
CacheMode.CONNECTION
这不会限制连接数。 相反,它指定允许的空闲打开连接数。 |
从版本 1.5.5 开始,提供了一个名为 .
设置此属性后,它将限制允许的连接总数。
设置后,如果达到限制,则用于等待连接变为空闲状态。
如果超过时间,则引发 an。connectionLimit
channelCheckoutTimeLimit
AmqpTimeoutException
当缓存模式为 时,自动声明队列等
(请参阅 Exchanges、Queues, and Bindings 的自动声明) 不受支持。 此外,在撰写本文时,默认情况下,该库会为每个连接创建一个固定的线程池(默认大小:threads)。
使用大量连接时,应考虑在 .
然后,所有连接都可以使用相同的 executor,并且可以共享其线程。
执行程序的线程池应该是无界的,或者针对预期用途进行适当设置(通常,每个连接至少一个线程)。
如果在每个连接上创建了多个通道,则池大小会影响并发性,因此可变(或简单缓存)线程池执行程序将是最合适的。 |
重要的是要了解缓存大小(默认情况下)不是一个限制,而只是可以缓存的通道数。 如果缓存大小为 10,则实际上可以使用任意数量的通道。 如果使用的通道超过 10 个,并且它们都返回到缓存中,则 10 个通道进入缓存。 其余的都是实体关闭的。
从版本 1.6 开始,默认通道缓存大小已从 1 增加到 25。 在高容量、多线程环境中,小缓存意味着以高速率创建和关闭通道。 增加默认缓存大小可以避免此开销。 您应该通过 RabbitMQ Admin UI 监控正在使用的通道,并考虑进一步增加缓存大小,如果您 查看正在创建和关闭的许多频道。 缓存仅按需增长(以满足应用程序的并发要求),因此此更改不会 影响现有的低容量应用程序。
从版本 1.4.2 开始,具有一个名为 的属性。
当此属性大于零时,这将限制可在连接上创建的通道数。
如果达到限制,则调用 threads 将阻塞,直到通道可用或达到此超时,在这种情况下,将抛出 a。CachingConnectionFactory
channelCheckoutTimeout
channelCacheSize
AmqpTimeoutException
框架中使用的通道(例如 )将可靠地返回到缓存中。
如果您在框架之外创建通道(例如
通过直接访问连接并调用 ),你必须可靠地返回它们(通过关闭),也许在一个块中,以避免用完通道。RabbitTemplate createChannel() finally |
以下示例显示如何创建新的 :connection
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("somehost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.createConnection();
使用 XML 时,配置可能类似于以下示例:
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
</bean>
还有一个仅在框架的单元测试代码中可用的实现。
它比 , 更简单,因为它不缓存通道,但由于缺乏性能和弹性,它不打算用于简单测试之外的实际使用。
如果出于某种原因需要实现自己的 base 类,则 base class 可能是一个很好的起点。SingleConnectionFactory CachingConnectionFactory ConnectionFactory AbstractConnectionFactory |
可以使用 rabbit 命名空间快速方便地创建 A,如下所示:ConnectionFactory
<rabbit:connection-factory id="connectionFactory"/>
在大多数情况下,此方法更可取,因为框架可以为您选择最佳默认值。
创建的实例是一个 .
请记住,通道的默认缓存大小为 25。
如果要缓存更多通道,请通过设置 'channelCacheSize' 属性来设置更大的值。
在 XML 中,它如下所示:CachingConnectionFactory
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
<property name="channelCacheSize" value="50"/>
</bean>
此外,使用命名空间,您可以添加 'channel-cache-size' 属性,如下所示:
<rabbit:connection-factory
id="connectionFactory" channel-cache-size="50"/>
默认缓存模式为 ,但您可以将其配置为缓存连接。
在以下示例中,我们使用 :CHANNEL
connection-cache-size
<rabbit:connection-factory
id="connectionFactory" cache-mode="CONNECTION" connection-cache-size="25"/>
您可以使用命名空间提供 host 和 port 属性,如下所示:
<rabbit:connection-factory
id="connectionFactory" host="somehost" port="5672"/>
或者,如果在群集环境中运行,则可以使用 addresses 属性,如下所示:
<rabbit:connection-factory
id="connectionFactory" addresses="host1:5672,host2:5672" address-shuffle-mode="RANDOM"/>
请参阅 连接到集群 以了解有关 的信息。address-shuffle-mode
以下示例具有自定义线程工厂,该工厂在线程名称前面加上 :rabbitmq-
<rabbit:connection-factory id="multiHost" virtual-host="/bar" addresses="host1:1234,host2,host3:4567"
thread-factory="tf"
channel-cache-size="10" username="user" password="password" />
<bean id="tf" class="org.springframework.scheduling.concurrent.CustomizableThreadFactory">
<constructor-arg value="rabbitmq-" />
</bean>
AddressResolver 地址解析器
从版本 2.1.15 开始,您现在可以使用 an 来解析连接地址。
这将覆盖 和 属性的任何设置。AddressResolver
addresses
host/port
命名连接
从版本 1.7 开始,提供了 a 用于将 .
生成的名称用于目标 RabbitMQ 连接的应用程序特定标识。
如果 RabbitMQ 服务器支持连接名称,则连接名称将显示在管理 UI 中。
此值不必是唯一的,并且不能用作连接标识符,例如,在 HTTP API 请求中。
此值应该是人类可读的,并且是 under the key 的一部分。
您可以使用简单的 Lambda,如下所示:ConnectionNameStrategy
AbstractionConnectionFactory
ClientProperties
connection_name
connectionFactory.setConnectionNameStrategy(connectionFactory -> "MY_CONNECTION");
该参数可用于通过某种逻辑区分目标连接名称。
默认情况下,的 、表示对象的十六进制字符串和内部计数器用于生成 .
namespace 组件也提供了 attribute 。ConnectionFactory
beanName
AbstractConnectionFactory
connection_name
<rabbit:connection-factory>
connection-name-strategy
的实现将连接名称设置为应用程序属性。
您可以将其声明为 a 并将其注入到连接工厂中,如下例所示:SimplePropertyValueConnectionNameStrategy
@Bean
@Bean
public SimplePropertyValueConnectionNameStrategy cns() {
return new SimplePropertyValueConnectionNameStrategy("spring.application.name");
}
@Bean
public ConnectionFactory rabbitConnectionFactory(ConnectionNameStrategy cns) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
...
connectionFactory.setConnectionNameStrategy(cns);
return connectionFactory;
}
该属性必须存在于应用程序上下文的 .Environment
使用 Spring Boot 及其自动配置的连接工厂时,您只需声明 .
Boot 会自动检测 bean 并将其连接到工厂。ConnectionNameStrategy @Bean |
阻塞的连接和资源限制
该连接可能被阻止,无法与对应于 Memory Alarm 的代理进行交互。
从版本 2.0 开始,可以为 提供实例,以通知连接已阻止和未阻止事件。
此外,还通过其内部实现分别发出 a 和 。
这些允许您提供应用程序逻辑来对 broker 上的问题做出适当的反应,并(例如)采取一些纠正措施。org.springframework.amqp.rabbit.connection.Connection
com.rabbitmq.client.BlockedListener
AbstractConnectionFactory
ConnectionBlockedEvent
ConnectionUnblockedEvent
BlockedListener
当应用程序配置单个时,就像默认情况下使用 Spring Boot 自动配置一样,当连接被 Broker 阻止时,应用程序将停止工作。
当它被 Broker 阻止时,它的任何客户端都会停止工作。
如果我们在同一个应用程序中有 Producer 和 Consumer,那么当 Producer 阻止连接(因为 Broker 上不再有资源)并且 Consumer 无法释放它们(因为连接被阻止)时,我们最终可能会遇到死锁。
为了缓解此问题,我们建议再有一个具有相同选项的单独实例 — 一个用于生产者,一个用于使用者。
对于在使用者线程上执行的事务性生产者,不可能单独使用,因为它们应该重用与使用者事务关联的。CachingConnectionFactory CachingConnectionFactory CachingConnectionFactory Channel |
从版本 2.0.2 开始,除非正在使用事务,否则 具有自动使用第二个连接工厂的配置选项。
有关更多信息,请参阅使用单独的连接。
for the publisher 连接与主要策略相同,但附加到调用方法的结果中。RabbitTemplate
ConnectionNameStrategy
.publisher
从版本 1.7.7 开始,提供了 an,当无法创建 a 时会抛出该 (例如,因为已达到限制并且缓存中没有可用的通道)。
您可以在某个回退后使用此异常来恢复操作。AmqpResourceNotAvailableException
SimpleConnection.createChannel()
Channel
channelMax
RetryPolicy
配置底层客户端连接工厂
使用 Rabbit client 的实例。
在 上设置等效属性时,会传递许多配置属性(例如)。
要设置其他属性(例如),您可以定义 Rabbit 工厂的实例,并使用 .
使用命名空间时(如前所述),您需要在属性中提供对已配置工厂的引用。
为方便起见,提供了一个工厂 Bean 来帮助在 Spring 应用程序上下文中配置连接工厂,如下一节所述。CachingConnectionFactory
ConnectionFactory
host
port
userName
password
requestedHeartBeat
connectionTimeout
CachingConnectionFactory
clientProperties
CachingConnectionFactory
connection-factory
<rabbit:connection-factory
id="connectionFactory" connection-factory="rabbitConnectionFactory"/>
默认情况下,4.0.x 客户端启用自动恢复。
虽然与此功能兼容,但 Spring AMQP 有自己的恢复机制,通常不需要 Client 端恢复功能。
我们建议禁用自动恢复,以避免在代理可用但连接尚未恢复时获取实例。
您可能会注意到此异常,例如,在 中配置 a 时,即使故障转移到集群中的其他代理也是如此。
由于自动恢复连接在计时器上恢复,因此可以使用 Spring AMQP 的恢复机制更快地恢复连接。
从版本 1.7.1 开始, Spring AMQP 禁用自动恢复,除非你显式创建自己的 RabbitMQ 连接工厂并将其提供给.
默认情况下,由 创建的 RabbitMQ 实例也具有禁用选项。amqp-client AutoRecoverConnectionNotCurrentlyOpenException RetryTemplate RabbitTemplate amqp-client CachingConnectionFactory ConnectionFactory RabbitConnectionFactoryBean |
RabbitConnectionFactoryBean
和配置 SSL
从版本 1.4 开始,通过使用依赖项注入,可以方便地在底层客户端连接工厂上配置 SSL 属性。
其他 setter 委托给底层工厂。
以前,您必须以编程方式配置 SSL 选项。
以下示例显示如何配置 :RabbitConnectionFactoryBean
RabbitConnectionFactoryBean
@Bean
RabbitConnectionFactoryBean rabbitConnectionFactory() {
RabbitConnectionFactoryBean factoryBean = new RabbitConnectionFactoryBean();
factoryBean.setUseSSL(true);
factoryBean.setSslPropertiesLocation(new ClassPathResource("secrets/rabbitSSL.properties"));
return factoryBean;
}
@Bean
CachingConnectionFactory connectionFactory(ConnectionFactory rabbitConnectionFactory) {
CachingConnectionFactory ccf = new CachingConnectionFactory(rabbitConnectionFactory);
ccf.setHost("...");
// ...
return ccf;
}
spring.rabbitmq.ssl.enabled:true
spring.rabbitmq.ssl.keyStore=...
spring.rabbitmq.ssl.keyStoreType=jks
spring.rabbitmq.ssl.keyStorePassword=...
spring.rabbitmq.ssl.trustStore=...
spring.rabbitmq.ssl.trustStoreType=jks
spring.rabbitmq.ssl.trustStorePassword=...
spring.rabbitmq.host=...
...
<rabbit:connection-factory id="rabbitConnectionFactory"
connection-factory="clientConnectionFactory"
host="${host}"
port="${port}"
virtual-host="${vhost}"
username="${username}" password="${password}" />
<bean id="clientConnectionFactory"
class="org.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean">
<property name="useSSL" value="true" />
<property name="sslPropertiesLocation" value="classpath:secrets/rabbitSSL.properties"/>
</bean>
有关配置 SSL 的信息,请参阅 RabbitMQ 文档。
省略 and 配置以通过 SSL 进行连接,而无需进行证书验证。
下一个示例显示如何提供密钥和信任存储配置。keyStore
trustStore
该属性是一个 Spring,指向包含以下键的属性文件:sslPropertiesLocation
Resource
keyStore=file:/secret/keycert.p12
trustStore=file:/secret/trustStore
keyStore.passPhrase=secret
trustStore.passPhrase=secret
和 是 Spring 指向商店。
通常,此属性文件由操作系统保护,应用程序具有读取访问权限。keyStore
truststore
Resources
从 Spring AMQP 版本 1.5 开始,您可以直接在工厂 bean 上设置这些属性。
如果同时提供了 discrete 属性 和 ,则后者中的 properties 会覆盖
discrete 值。sslPropertiesLocation
从版本 2.0 开始,默认情况下会验证服务器证书,因为它更安全。
如果出于某种原因希望跳过此验证,请将工厂 Bean 的属性设置为。
从版本 2.1 开始,now 默认调用。
要恢复到之前的行为,请将该属性设置为 。skipServerCertificateValidation true RabbitConnectionFactoryBean enableHostnameVerification() enableHostnameVerification false |
从版本 2.2.5 开始,默认情况下,工厂 Bean 将始终使用 TLS v1.2;以前,它在某些情况下使用 v1.1,而在其他情况下使用 v1.2(取决于其他属性)。
如果出于某种原因需要使用 v1.1,请设置属性: 。sslAlgorithm setSslAlgorithm("TLSv1.1") |
连接到集群
要连接到集群,请在 :addresses
CachingConnectionFactory
@Bean
public CachingConnectionFactory ccf() {
CachingConnectionFactory ccf = new CachingConnectionFactory();
ccf.setAddresses("host1:5672,host2:5672,host3:5672");
return ccf;
}
从版本 3.0 开始,每当建立新连接时,底层连接工厂将尝试通过选择随机地址连接到主机。
要恢复到以前尝试从第一个到最后一个连接的行为,请将该属性设置为 。addressShuffleMode
AddressShuffleMode.NONE
从版本 2.3 开始,添加了 shuffle 模式,这意味着在创建连接后,第一个地址将移动到末尾。
如果您希望从所有节点上的所有分片中使用,您可能希望将此模式与 RabbitMQ 分片插件一起使用,并具有适当的并发性。INORDER
CacheMode.CONNECTION
@Bean
public CachingConnectionFactory ccf() {
CachingConnectionFactory ccf = new CachingConnectionFactory();
ccf.setAddresses("host1:5672,host2:5672,host3:5672");
ccf.setAddressShuffleMode(AddressShuffleMode.INORDER);
return ccf;
}
路由连接工厂
从版本 1.3 开始,引入了 。
该工厂提供了一种机制,可以为多个映射配置映射,并在运行时由一些人确定目标。
通常,该实现会检查线程绑定的上下文。
为方便起见, Spring AMQP 提供了 ,它从 .
以下示例显示了如何在 XML 和 Java 中配置 a:AbstractRoutingConnectionFactory
ConnectionFactories
ConnectionFactory
lookupKey
SimpleRoutingConnectionFactory
lookupKey
SimpleResourceHolder
SimpleRoutingConnectionFactory
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory">
<property name="targetConnectionFactories">
<map>
<entry key="#{connectionFactory1.virtualHost}" ref="connectionFactory1"/>
<entry key="#{connectionFactory2.virtualHost}" ref="connectionFactory2"/>
</map>
</property>
</bean>
<rabbit:template id="template" connection-factory="connectionFactory" />
public class MyService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void service(String vHost, String payload) {
SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(), vHost);
rabbitTemplate.convertAndSend(payload);
SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());
}
}
使用后取消绑定资源很重要。
有关更多信息,请参阅 JavaDoc for 。AbstractRoutingConnectionFactory
从版本 1.4 开始,支持 SpEL 和属性,这些属性在每个 AMQP 协议交互操作(、或)上进行评估,解析为提供的 .
您可以使用 Bean 引用,例如在表达式中。
对于操作,要发送的消息是根评估对象。
对于操作,是根评估对象。RabbitTemplate
sendConnectionFactorySelectorExpression
receiveConnectionFactorySelectorExpression
send
sendAndReceive
receive
receiveAndReply
lookupKey
AbstractRoutingConnectionFactory
@vHostResolver.getVHost(#root)
send
receive
queueName
路由算法如下:如果选择器表达式是 或 被计算为 或 提供的不是 instance of ,则一切都像以前一样工作,依赖于提供的实现。
如果评估结果不是 ,但没有目标,并且 配置了 ,则也会发生同样的情况。
对于 ,它确实回退到基于 的实现。
但是,如果 , an 被抛出。null
null
ConnectionFactory
AbstractRoutingConnectionFactory
ConnectionFactory
null
ConnectionFactory
lookupKey
AbstractRoutingConnectionFactory
lenientFallback = true
AbstractRoutingConnectionFactory
routing
determineCurrentLookupKey()
lenientFallback = false
IllegalStateException
命名空间支持还在组件上提供了 and 属性。send-connection-factory-selector-expression
receive-connection-factory-selector-expression
<rabbit:template>
此外,从版本 1.4 开始,您可以在侦听器容器中配置路由连接工厂。
在这种情况下,队列名称列表将用作查找键。
例如,如果使用 配置容器,则查找键为 (请注意,键中没有空格)。setQueueNames("thing1", "thing2")
[thing1,thing]"
从版本 1.6.9 开始,您可以通过在侦听器容器上使用 lookup key 向查找键添加限定符。
例如,这样做可以侦听具有相同名称但在不同虚拟主机中的队列(每个虚拟主机都有一个连接工厂)。setLookupKeyQualifier
例如,使用 lookup key 限定符和侦听 queue 的容器,您可以向其注册目标连接工厂的 lookup key 可以是 。thing1
thing2
thing1[thing2]
目标(如果提供,则为默认)连接工厂必须具有相同的发布者确认和返回设置。 请参阅 发布者确认并返回。 |
从版本 2.4.4 开始,可以禁用此验证。
如果您遇到 confirms 和 returns 之间的值需要不相等的情况,则可以使用 来关闭验证。
请注意,添加到 的第一个连接工厂将确定 和 的一般值。AbstractRoutingConnectionFactory#setConsistentConfirmsReturns
AbstractRoutingConnectionFactory
confirms
returns
如果您遇到某些消息需要检查确认/返回而其他消息不确认/返回的情况,这可能会很有用。 例如:
@Bean
public RabbitTemplate rabbitTemplate() {
final com.rabbitmq.client.ConnectionFactory cf = new com.rabbitmq.client.ConnectionFactory();
cf.setHost("localhost");
cf.setPort(5672);
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(cf);
cachingConnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
PooledChannelConnectionFactory pooledChannelConnectionFactory = new PooledChannelConnectionFactory(cf);
final Map<Object, ConnectionFactory> connectionFactoryMap = new HashMap<>(2);
connectionFactoryMap.put("true", cachingConnectionFactory);
connectionFactoryMap.put("false", pooledChannelConnectionFactory);
final AbstractRoutingConnectionFactory routingConnectionFactory = new SimpleRoutingConnectionFactory();
routingConnectionFactory.setConsistentConfirmsReturns(false);
routingConnectionFactory.setDefaultTargetConnectionFactory(pooledChannelConnectionFactory);
routingConnectionFactory.setTargetConnectionFactories(connectionFactoryMap);
final RabbitTemplate rabbitTemplate = new RabbitTemplate(routingConnectionFactory);
final Expression sendExpression = new SpelExpressionParser().parseExpression(
"messageProperties.headers['x-use-publisher-confirms'] ?: false");
rabbitTemplate.setSendConnectionFactorySelectorExpression(sendExpression);
}
这样,带有 header 的消息将通过缓存连接发送,您可以确保消息送达。
有关确保邮件送达的更多信息,请参阅 Publisher Confirms and Returns 。x-use-publisher-confirms: true
Queue Affinity 和LocalizedQueueConnectionFactory
在集群中使用 HA 队列时,为了获得最佳性能,您可能需要连接到物理代理
lead 队列所在的位置。
可以配置多个代理地址。
这是为了进行故障转移,客户端会尝试按照配置的顺序进行连接。
它使用管理插件提供的 REST API 来确定哪个节点是队列的潜在客户。
然后,它会创建(或从缓存中检索)仅连接到该节点的 a。
如果连接失败,则确定新的前导节点,并使用方连接到该节点。
配置了默认连接工厂,以防无法确定队列的物理位置,在这种情况下,它会正常连接到集群。CachingConnectionFactory
AddressShuffleMode
LocalizedQueueConnectionFactory
CachingConnectionFactory
LocalizedQueueConnectionFactory
是 a 和 ,它使用队列名称作为查找键,如上面的 路由连接工厂 中所述。LocalizedQueueConnectionFactory
RoutingConnectionFactory
SimpleMessageListenerContainer
因此(使用队列名称进行查找),仅当容器配置为侦听单个队列时,才能使用 。LocalizedQueueConnectionFactory |
必须在每个节点上启用 RabbitMQ 管理插件。 |
此连接工厂适用于长期连接,例如 .
它不适用于短连接使用,例如与 a 一起使用,因为在建立连接之前调用 REST API 会产生开销。
此外,对于发布操作,队列是未知的,并且消息无论如何都会发布到所有集群成员,因此查找节点的逻辑几乎没有价值。SimpleMessageListenerContainer RabbitTemplate |
以下示例配置显示了如何配置工厂:
@Autowired
private ConfigurationProperties props;
@Bean
public CachingConnectionFactory defaultConnectionFactory() {
CachingConnectionFactory cf = new CachingConnectionFactory();
cf.setAddresses(this.props.getAddresses());
cf.setUsername(this.props.getUsername());
cf.setPassword(this.props.getPassword());
cf.setVirtualHost(this.props.getVirtualHost());
return cf;
}
@Bean
public LocalizedQueueConnectionFactory queueAffinityCF(
@Qualifier("defaultConnectionFactory") ConnectionFactory defaultCF) {
return new LocalizedQueueConnectionFactory(defaultCF,
StringUtils.commaDelimitedListToStringArray(this.props.getAddresses()),
StringUtils.commaDelimitedListToStringArray(this.props.getAdminUris()),
StringUtils.commaDelimitedListToStringArray(this.props.getNodes()),
this.props.getVirtualHost(), this.props.getUsername(), this.props.getPassword(),
false, null);
}
请注意,前三个参数是 、 和 的数组。
这些是位置性的,因为当容器尝试连接到队列时,它使用 admin API 来确定哪个节点是队列的引线,并连接到与该节点位于同一数组位置的地址。addresses
adminUris
nodes
从版本 3.0 开始,RabbitMQ 不再用于访问 Rest API。
相反,默认情况下,如果在类路径上,则使用 from Spring Webflux;否则使用 a。http-client WebClient spring-webflux RestTemplate |
要添加到类路径,请执行以下操作:WebFlux
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>
compile 'org.springframework.amqp:spring-rabbit'
您还可以通过实现和覆盖其方法(可选)来使用其他 REST 技术。LocalizedQueueConnectionFactory.NodeLocator
createClient, ``restCall
close
lqcf.setNodeLocator(new NodeLocator<MyClient>() {
@Override
public MyClient createClient(String userName, String password) {
...
}
@Override
public HashMap<String, Object> restCall(MyClient client, URI uri) {
...
});
});
框架提供 和 ,默认值如上所述。WebFluxNodeLocator
RestTemplateNodeLocator
发布者确认并返回
通过将属性设置为 'true' 和属性,支持已确认(带关联)和返回的消息。CachingConnectionFactory
publisherConfirmType
ConfirmType.CORRELATED
publisherReturns
设置这些选项后,工厂创建的实例将包装在 中,用于促进回调。
当获得这样的通道时,客户端可以向 注册 。
该实现包含用于将 confirm 或 return 路由到相应侦听器的逻辑。
这些功能将在以下各节中进一步说明。Channel
PublisherCallbackChannel
PublisherCallbackChannel.Listener
Channel
PublisherCallbackChannel
另请参阅 Correlated Publisher Confirms and Returns 和 Scoped Operations 中。simplePublisherConfirms
有关更多背景信息,请参阅 RabbitMQ 团队的博客文章,标题为 Introducing Publisher Confirms。 |
连接侦听器和通道侦听器
连接工厂支持注册和实现。
这允许您接收连接和通道相关事件的通知。
(A 用于在建立连接时执行声明 - 有关更多信息,请参阅 Automatic Declaration of Exchanges, Queues, and Bindings)。
下面的清单显示了接口定义:ConnectionListener
ChannelListener
ConnectionListener
RabbitAdmin
ConnectionListener
@FunctionalInterface
public interface ConnectionListener {
void onCreate(Connection connection);
default void onClose(Connection connection) {
}
default void onShutDown(ShutdownSignalException signal) {
}
}
从版本 2.0 开始,可以为对象提供实例,以通知连接已阻止和未阻止事件。
以下示例显示了 ChannelListener 接口定义:org.springframework.amqp.rabbit.connection.Connection
com.rabbitmq.client.BlockedListener
@FunctionalInterface
public interface ChannelListener {
void onCreate(Channel channel, boolean transactional);
default void onShutDown(ShutdownSignalException signal) {
}
}
请参阅发布是异步的 — 如何检测成功和失败,了解您可能希望注册 .ChannelListener
记录通道关闭事件
版本 1.5 引入了一种机制,使用户能够控制日志记录级别。
它使用默认策略来记录 Channel Closures,如下所示:AbstractConnectionFactory
-
正常通道关闭 (200 OK) 不会被记录。
-
如果通道由于被动队列声明失败而关闭,则将其记录在 DEBUG 级别。
-
如果通道因独占消费者条件而被拒绝而关闭,则会在 DEBUG 级别(自 3.1 起,以前为 INFO)。
basic.consume
-
所有其他记录都为 ERROR 级别。
要修改此行为,您可以将自定义注入其 in 其属性中。ConditionalExceptionLogger
CachingConnectionFactory
closeExceptionLogger
此外,现在是 public 的,允许对其进行子类化。AbstractConnectionFactory.DefaultChannelCloseLogger
另请参阅 Consumer Events。
运行时缓存属性
从 1.6 版本开始,现在通过该方法提供缓存统计信息。
这些统计信息可用于优化缓存,以便在生产环境中对其进行优化。
例如,高水位线可用于确定是否应增加缓存大小。
如果它等于缓存大小,则可能需要考虑进一步增加。
下表描述了这些属性:CachingConnectionFactory
getCacheProperties()
CacheMode.CHANNEL
财产 | 意义 |
---|---|
connectionName |
由 生成的连接的名称。 |
channelCacheSize |
当前配置的允许空闲的最大通道数。 |
localPort |
连接的本地端口(如果可用)。 这可用于与 RabbitMQ Admin UI 上的连接和通道相关联。 |
idleChannelsTx |
当前处于空闲 (缓存) 状态的事务通道数。 |
idleChannelsNotTx |
当前处于空闲 (缓存) 状态的非事务性通道数。 |
idleChannelsTxHighWater |
已同时空闲(缓存)的事务通道的最大数量。 |
idleChannelsNotTxHighWater |
非事务性通道的最大数量已同时空闲(缓存)。 |
下表描述了这些属性:CacheMode.CONNECTION
财产 | 意义 |
---|---|
connectionName:<localPort> |
由 生成的连接的名称。 |
openConnections |
表示与 broker 的连接的连接对象的数目。 |
channelCacheSize |
当前配置的允许空闲的最大通道数。 |
connectionCacheSize |
当前配置的最大允许空闲连接数。 |
idleConnections |
当前空闲的连接数。 |
idleConnectionsHighWater |
当前空闲的最大连接数。 |
idleChannelsTx:<localPort> |
此连接当前处于空闲 (缓存) 状态的事务通道数。
您可以使用属性名称的一部分与 RabbitMQ Admin UI 上的连接和通道相关联。 |
idleChannelsNotTx:<localPort> |
此连接当前处于空闲 (缓存) 状态的非事务性通道数。
属性名称的一部分可用于与 RabbitMQ Admin UI 上的连接和通道相关联。 |
idleChannelsTxHighWater:<localPort> |
已同时空闲(缓存)的事务通道的最大数量。 属性名称的 localPort 部分可用于与 RabbitMQ 管理 UI 上的连接和通道相关联。 |
idleChannelsNotTxHighWater:<localPort> |
非事务性通道的最大数量已同时空闲(缓存)。
您可以使用属性名称的一部分与 RabbitMQ Admin UI 上的连接和通道相关联。 |
属性 ( or ) 也包括在内。cacheMode
CHANNEL
CONNECTION
RabbitMQ 自动连接 / 拓扑恢复
自 Spring AMQP 的第一个版本以来,该框架在代理发生故障时提供了自己的连接和通道恢复。
此外,如 配置 Broker 中所述,在重新建立连接时,将重新声明任何基础结构 Bean(队列和其他)。
因此,它不依赖于磁带库现在提供的自动恢复。
默认情况下,, 已启用自动恢复。
两种恢复机制之间存在一些不兼容之处,因此,默认情况下, Spring 将底层的属性设置为 。
即使该属性是 ,Spring 也会通过立即关闭任何已恢复的连接来有效地禁用它。RabbitAdmin
amqp-client
amqp-client
automaticRecoveryEnabled
RabbitMQ connectionFactory
false
true
默认情况下,只有定义为 bean 的元素(queues、exchanges、bindings)才会在连接失败后被重新声明。 有关如何更改该行为,请参阅恢复自动删除声明。 |