AMQP 规范描述了如何使用该协议在代理上配置队列、交换和绑定。
这些操作(可从 0.8 规范及更高版本移植)存在于包的接口中。
该类的 RabbitMQ 实现位于包中。AmqpAdmin
org.springframework.amqp.core
RabbitAdmin
org.springframework.amqp.rabbit.core
该接口基于使用 Spring AMQP 域抽象,如下面的清单所示:AmqpAdmin
public interface AmqpAdmin {
// Exchange Operations
void declareExchange(Exchange exchange);
void deleteExchange(String exchangeName);
// Queue Operations
Queue declareQueue();
String declareQueue(Queue queue);
void deleteQueue(String queueName);
void deleteQueue(String queueName, boolean unused, boolean empty);
void purgeQueue(String queueName, boolean noWait);
// Binding Operations
void declareBinding(Binding binding);
void removeBinding(Binding binding);
Properties getQueueProperties(String queueName);
}
另请参阅 作用域操作。
该方法返回有关队列的一些有限信息(消息计数和使用者计数)。
返回的属性的键可用作 (、 和 ) 中的常量。
RabbitMQ REST API 在对象中提供了更多信息。getQueueProperties()
RabbitTemplate
QUEUE_NAME
QUEUE_MESSAGE_COUNT
QUEUE_CONSUMER_COUNT
QueueInfo
no-arg 方法使用自动生成的名称定义代理上的队列。
此自动生成的队列的其他属性包括 、 和 。declareQueue()
exclusive=true
autoDelete=true
durable=false
该方法采用一个对象并返回已声明队列的名称。
如果 provided 的属性为空,则代理使用生成的名称声明队列。
该名称将返回给调用方。
该名称也会添加到 .
您只能通过直接调用
在应用程序上下文中以声明方式定义队列时,如果管理员使用 auto-declaration 时,您可以将 name 属性设置为(空字符串)。
然后,代理创建名称。
从版本 2.1 开始,侦听器容器可以使用这种类型的队列。
有关更多信息,请参阅容器和以 Broker-Named 命名的队列。declareQueue(Queue queue)
Queue
name
Queue
String
actualName
Queue
RabbitAdmin
""
这与框架生成唯一的 () 名称并将 和 、 设置为 形成鲜明对比。
具有空(或缺失)属性的 A 始终会创建一个 .AnonymousQueue
UUID
durable
false
exclusive
autoDelete
true
<rabbit:queue/>
name
AnonymousQueue
请参阅 AnonymousQueue
以了解为什么 优先于代理生成的队列名称以及
如何控制名称的格式。
从版本 2.1 开始,匿名队列的声明参数默认设置为 。
这可确保在应用程序连接到的节点上声明队列。
声明式队列必须具有固定的名称,因为它们可能会在上下文中的其他位置引用,例如在
listener 中所示的 Listener 示例:AnonymousQueue
Queue.X_QUEUE_LEADER_LOCATOR
client-local
<rabbit:listener-container>
<rabbit:listener ref="listener" queue-names="#{someQueue.name}" />
</rabbit:listener-container>
此接口的 RabbitMQ 实现是,当使用 Spring XML 进行配置时,它类似于以下示例:RabbitAdmin
<rabbit:connection-factory id="connectionFactory"/>
<rabbit:admin id="amqpAdmin" connection-factory="connectionFactory"/>
当缓存模式为(默认)时,实现会自动延迟声明在同一 .
这些组件在向 broker 打开 后立即声明。
有一些命名空间功能使这非常方便 — 例如,
在 Stocks 示例应用程序中,我们有以下内容:CachingConnectionFactory
CHANNEL
RabbitAdmin
ApplicationContext
Connection
<rabbit:queue id="tradeQueue"/>
<rabbit:queue id="marketDataQueue"/>
<fanout-exchange name="broadcast.responses"
xmlns="http://www.springframework.org/schema/rabbit">
<bindings>
<binding queue="tradeQueue"/>
</bindings>
</fanout-exchange>
<topic-exchange name="app.stock.marketdata"
xmlns="http://www.springframework.org/schema/rabbit">
<bindings>
<binding queue="marketDataQueue" pattern="${stocks.quote.pattern}"/>
</bindings>
</topic-exchange>
在前面的示例中,我们使用匿名队列(实际上,在内部,只是名称由框架生成,而不是由代理生成的队列)并通过 ID 引用它们。 我们还可以用显式名称声明队列,这些名称也用作上下文中它们的 bean 定义的标识符。 以下示例使用显式名称配置队列:
<rabbit:queue name="stocks.trade.queue"/>
您可以同时提供 and 属性。
这允许您使用独立于队列名称的 ID 来引用队列(例如,在绑定中)。
它还允许标准的 Spring 功能(例如队列名称的属性占位符和 SPEL 表达式)。
当您使用名称作为 Bean 标识符时,这些功能不可用。id name |
可以使用其他参数配置队列 — 例如 .
当您使用命名空间支持时,它们以 argument-name/argument-value 对的形式提供,这些对使用 element 定义。
以下示例显示了如何执行此操作:x-message-ttl
Map
<rabbit:queue-arguments>
<rabbit:queue name="withArguments">
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange" value="myDLX"/>
<entry key="x-dead-letter-routing-key" value="dlqRK"/>
</rabbit:queue-arguments>
</rabbit:queue>
默认情况下,参数假定为字符串。 对于其他类型的参数,您必须提供类型。 以下示例显示如何指定类型:
<rabbit:queue name="withArguments">
<rabbit:queue-arguments value-type="java.lang.Long">
<entry key="x-message-ttl" value="100"/>
</rabbit:queue-arguments>
</rabbit:queue>
在提供混合类型的参数时,必须为每个 entry 元素提供类型。 以下示例显示了如何执行此操作:
<rabbit:queue name="withArguments">
<rabbit:queue-arguments>
<entry key="x-message-ttl">
<value type="java.lang.Long">100</value>
</entry>
<entry key="x-dead-letter-exchange" value="myDLX"/>
<entry key="x-dead-letter-routing-key" value="dlqRK"/>
</rabbit:queue-arguments>
</rabbit:queue>
对于 Spring Framework 3.2 及更高版本,可以更简洁地声明这一点,如下所示:
<rabbit:queue name="withArguments">
<rabbit:queue-arguments>
<entry key="x-message-ttl" value="100" value-type="java.lang.Long"/>
<entry key="x-ha-policy" value="all"/>
</rabbit:queue-arguments>
</rabbit:queue>
当您使用 Java 配置时,支持通过类上的方法将参数作为 first class 属性。
从版本 2.1 开始,匿名队列的声明将此属性设置为 by default。
这可确保在应用程序连接到的节点上声明队列。Queue.X_QUEUE_LEADER_LOCATOR
setLeaderLocator()
Queue
client-local
RabbitMQ 代理不允许声明具有不匹配参数的队列。
例如,如果 a 已经存在且没有参数,并且您尝试使用 (例如) 声明它,则会引发异常。queue time to live key="x-message-ttl" value="100" |
默认情况下,当发生任何异常时,会立即停止处理所有声明。
这可能会导致下游问题,例如侦听器容器无法初始化,因为未声明另一个队列(在错误的队列之后定义)。RabbitAdmin
可以通过将 instance 属性设置为来修改此行为。
此选项指示 记录异常并继续声明其他元素。
配置 using Java 时,此属性称为 。
这是适用于所有元素的全局设置。
Queues、exchanges 和 bindings 具有类似的属性,该属性仅适用于这些元素。ignore-declaration-exceptions
true
RabbitAdmin
RabbitAdmin
RabbitAdmin
ignoreDeclarationExceptions
在 1.6 版本之前,此属性仅在通道上发生时生效,例如当前属性和所需属性不匹配时。
现在,此属性对任何异常(包括 and others)生效。IOException
TimeoutException
此外,任何声明异常都会导致发布 ,该 是上下文中的 any 都可以使用的 。
该事件包含对 admin、正在声明的元素以及 .DeclarationExceptionEvent
ApplicationEvent
ApplicationListener
Throwable
您可以同时提供 and 属性。
这允许您使用独立于队列名称的 ID 来引用队列(例如,在绑定中)。
它还允许标准的 Spring 功能(例如队列名称的属性占位符和 SPEL 表达式)。
当您使用名称作为 Bean 标识符时,这些功能不可用。id name |
RabbitMQ 代理不允许声明具有不匹配参数的队列。
例如,如果 a 已经存在且没有参数,并且您尝试使用 (例如) 声明它,则会引发异常。queue time to live key="x-message-ttl" value="100" |
标头交换
从版本 1.3 开始,您可以将 配置为在多个 Headers 上匹配。
您还可以指定是否必须匹配任何或所有标头。
以下示例显示了如何执行此操作:HeadersExchange
<rabbit:headers-exchange name="headers-test">
<rabbit:bindings>
<rabbit:binding queue="bucket">
<rabbit:binding-arguments>
<entry key="foo" value="bar"/>
<entry key="baz" value="qux"/>
<entry key="x-match" value="all"/>
</rabbit:binding-arguments>
</rabbit:binding>
</rabbit:bindings>
</rabbit:headers-exchange>
从版本 1.6 开始,您可以使用标志(默认为 )进行配置,并且可以通过 (如果应用程序上下文中存在标志)在 Broker 上正确配置。
如果该标志用于交换,则 RabbitMQ 不允许客户端使用该交换。
这对于死信交换或 exchange-to-exchange 绑定非常有用,在这种情况下,您不希望使用 exchange
直接由出版商提供。Exchanges
internal
false
Exchange
RabbitAdmin
internal
true
要了解如何使用 Java 配置 AMQP 基础结构,请查看 Stock 示例应用程序
其中有类 ,它反过来又有 和 子类。
下面的清单显示了 的代码:@Configuration
AbstractStockRabbitConfiguration
RabbitClientConfiguration
RabbitServerConfiguration
AbstractStockRabbitConfiguration
@Configuration
public abstract class AbstractStockAppRabbitConfiguration {
@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setMessageConverter(jsonMessageConverter());
configureRabbitTemplate(template);
return template;
}
@Bean
public Jackson2JsonMessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public TopicExchange marketDataExchange() {
return new TopicExchange("app.stock.marketdata");
}
// additional code omitted for brevity
}
在 Stock 应用程序中,使用以下类配置服务器:@Configuration
@Configuration
public class RabbitServerConfiguration extends AbstractStockAppRabbitConfiguration {
@Bean
public Queue stockRequestQueue() {
return new Queue("app.stock.request");
}
}
这是整个类继承链的结束。
最终结果是 和 在应用程序启动时向代理声明。
服务器配置中没有绑定到队列的绑定,因为这是在 Client 端应用程序中完成的。
但是,股票请求队列会自动绑定到 AMQP 默认交易所。
此行为由规范定义。@Configuration
TopicExchange
Queue
TopicExchange
client 类更有趣一些。
其声明如下:@Configuration
@Configuration
public class RabbitClientConfiguration extends AbstractStockAppRabbitConfiguration {
@Value("${stocks.quote.pattern}")
private String marketDataRoutingKey;
@Bean
public Queue marketDataQueue() {
return amqpAdmin().declareQueue();
}
/**
* Binds to the market data exchange.
* Interested in any stock quotes
* that match its routing key.
*/
@Bean
public Binding marketDataBinding() {
return BindingBuilder.bind(
marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
}
// additional code omitted for brevity
}
客户端通过 .
它使用在属性文件中外部化的路由模式将该队列绑定到市场数据交换。declareQueue()
AmqpAdmin
用于队列和交换的 Builder API
版本 1.6 引入了一个方便的 Fluent API,用于在使用 Java 配置时进行配置和对象。
以下示例演示如何使用它:Queue
Exchange
@Bean
public Queue queue() {
return QueueBuilder.nonDurable("foo")
.autoDelete()
.exclusive()
.withArgument("foo", "bar")
.build();
}
@Bean
public Exchange exchange() {
return ExchangeBuilder.directExchange("foo")
.autoDelete()
.internal()
.withArgument("foo", "bar")
.build();
}
有关更多信息,请参见org.springframework.amqp.core.QueueBuilder
和org.springframework.amqp.core.ExchangeBuilder
的 Javadoc。
从版本 2.0 开始,现在默认创建持久交换,以便与各个类上的简单构造函数保持一致。
要与构建器进行非持久交换,请使用 before ininvoke .
不再提供不带参数的方法。ExchangeBuilder
AbstractExchange
.durable(false)
.build()
durable()
2.2 版本引入了 Fluent API 来添加“众所周知的”exchange 和 queue 参数......
@Bean
public Queue allArgs1() {
return QueueBuilder.nonDurable("all.args.1")
.ttl(1000)
.expires(200_000)
.maxLength(42)
.maxLengthBytes(10_000)
.overflow(Overflow.rejectPublish)
.deadLetterExchange("dlx")
.deadLetterRoutingKey("dlrk")
.maxPriority(4)
.lazy()
.leaderLocator(LeaderLocator.minLeaders)
.singleActiveConsumer()
.build();
}
@Bean
public DirectExchange ex() {
return ExchangeBuilder.directExchange("ex.with.alternate")
.durable(true)
.alternate("alternate")
.build();
}
声明 Exchanges、Queues 和 Bindings 的集合
您可以将对象 (、 和 ) 的集合包装在对象中。
它会在应用程序上下文中检测此类 bean(以及离散 bean),并在建立连接时(最初和连接失败后)在代理上声明包含的对象。
以下示例显示了如何执行此操作:Declarable
Queue
Exchange
Binding
Declarables
RabbitAdmin
Declarable
@Configuration
public static class Config {
@Bean
public CachingConnectionFactory cf() {
return new CachingConnectionFactory("localhost");
}
@Bean
public RabbitAdmin admin(ConnectionFactory cf) {
return new RabbitAdmin(cf);
}
@Bean
public DirectExchange e1() {
return new DirectExchange("e1", false, true);
}
@Bean
public Queue q1() {
return new Queue("q1", false, false, true);
}
@Bean
public Binding b1() {
return BindingBuilder.bind(q1()).to(e1()).with("k1");
}
@Bean
public Declarables es() {
return new Declarables(
new DirectExchange("e2", false, true),
new DirectExchange("e3", false, true));
}
@Bean
public Declarables qs() {
return new Declarables(
new Queue("q2", false, false, true),
new Queue("q3", false, false, true));
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public Declarables prototypes() {
return new Declarables(new Queue(this.prototypeQueueName, false, false, true));
}
@Bean
public Declarables bs() {
return new Declarables(
new Binding("q2", DestinationType.QUEUE, "e2", "k2", null),
new Binding("q3", DestinationType.QUEUE, "e3", "k3", null));
}
@Bean
public Declarables ds() {
return new Declarables(
new DirectExchange("e4", false, true),
new Queue("q4", false, false, true),
new Binding("q4", DestinationType.QUEUE, "e4", "k4", null));
}
}
在 2.1 之前的版本中,您可以通过定义 bean 类型的 bean 来声明多个实例。
在某些情况下,这可能会导致不良的副作用,因为管理员必须迭代所有 bean。Declarable Collection<Declarable> Collection<?> |
版本 2.2 向 ;这可以用作一种方便,例如,在声明侦听器容器 bean 时。getDeclarablesByType
Declarables
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
Declarables mixedDeclarables, MessageListener listener) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(mixedDeclarables.getDeclarablesByType(Queue.class).toArray(new Queue[0]));
container.setMessageListener(listener);
return container;
}
在 2.1 之前的版本中,您可以通过定义 bean 类型的 bean 来声明多个实例。
在某些情况下,这可能会导致不良的副作用,因为管理员必须迭代所有 bean。Declarable Collection<Declarable> Collection<?> |
条件声明
默认情况下,所有 queues、exchanges 和 bindings 都由应用程序上下文中的所有实例(假设它们有)声明。RabbitAdmin
auto-startup="true"
从版本 2.1.9 开始,它有一个新属性(默认情况下);当设置为 时,管理员将仅声明显式配置为由该管理员声明的 bean。RabbitAdmin
explicitDeclarationsOnly
false
true
从 1.2 版本开始,您可以有条件地声明这些元素。 当应用程序连接到多个代理并需要指定应使用哪个代理声明特定元素时,这特别有用。 |
表示这些元素的类 implement ,它有两个方法: 和 。
使用这些方法来确定特定实例是否实际应处理其 .Declarable
shouldDeclare()
getDeclaringAdmins()
RabbitAdmin
Connection
这些属性可用作命名空间中的属性,如以下示例所示:
<rabbit:admin id="admin1" connection-factory="CF1" />
<rabbit:admin id="admin2" connection-factory="CF2" />
<rabbit:admin id="admin3" connection-factory="CF3" explicit-declarations-only="true" />
<rabbit:queue id="declaredByAdmin1AndAdmin2Implicitly" />
<rabbit:queue id="declaredByAdmin1AndAdmin2" declared-by="admin1, admin2" />
<rabbit:queue id="declaredByAdmin1Only" declared-by="admin1" />
<rabbit:queue id="notDeclaredByAllExceptAdmin3" auto-declare="false" />
<rabbit:direct-exchange name="direct" declared-by="admin1, admin2">
<rabbit:bindings>
<rabbit:binding key="foo" queue="bar"/>
</rabbit:bindings>
</rabbit:direct-exchange>
默认情况下,属性为 and,如果未提供(或为空),则所有实例都声明该对象(只要 admin 的 attribute 是 ,默认值,并且 admin 的 attribute 是 false)。auto-declare true declared-by RabbitAdmin auto-startup true explicit-declarations-only |
同样,您也可以使用 基于 Java 来达到相同的效果。
在以下示例中,组件由 声明,而不是 由 :@Configuration
admin1
admin2
@Bean
public RabbitAdmin admin1() {
return new RabbitAdmin(cf1());
}
@Bean
public RabbitAdmin admin2() {
return new RabbitAdmin(cf2());
}
@Bean
public Queue queue() {
Queue queue = new Queue("foo");
queue.setAdminsThatShouldDeclare(admin1());
return queue;
}
@Bean
public Exchange exchange() {
DirectExchange exchange = new DirectExchange("bar");
exchange.setAdminsThatShouldDeclare(admin1());
return exchange;
}
@Bean
public Binding binding() {
Binding binding = new Binding("foo", DestinationType.QUEUE, exchange().getName(), "foo", null);
binding.setAdminsThatShouldDeclare(admin1());
return binding;
}
从 1.2 版本开始,您可以有条件地声明这些元素。 当应用程序连接到多个代理并需要指定应使用哪个代理声明特定元素时,这特别有用。 |
默认情况下,属性为 and,如果未提供(或为空),则所有实例都声明该对象(只要 admin 的 attribute 是 ,默认值,并且 admin 的 attribute 是 false)。auto-declare true declared-by RabbitAdmin auto-startup true explicit-declarations-only |
关于 and 属性的说明id
name
属性 on 和 elements 反映 broker 中实体的名称。
对于队列,如果省略 the,则创建一个匿名队列(请参阅 AnonymousQueue
)。name
<rabbit:queue/>
<rabbit:exchange/>
name
在 2.0 之前的版本中,它也被注册为 bean 名称别名(类似于 on elements)。name
name
<bean/>
这导致了两个问题:
-
它阻止了具有相同名称的 queue 和 exchange 的声明。
-
如果别名包含 SPEL 表达式 (),则不会解析别名。
#{…}
从版本 2.0 开始,如果同时使用 an 和 a 属性声明其中一个元素,则该名称不再声明为 Bean 名称别名。
如果您希望声明一个队列并与相同的 交换,则必须提供一个 .id
name
name
id
如果元素只有一个属性,则不会发生更改。
bean 仍然可以被 引用 — 例如,在绑定声明中。
但是,如果名称包含 SPEL,则仍然无法引用它 - 您必须提供 a 以供参考。name
name
id
AnonymousQueue
通常,当您需要名称唯一、独占的自动删除队列时,我们建议您使用 而不是代理定义的队列名称(使用作为名称会导致代理生成队列
name) 的 S SAnonymousQueue
""
Queue
这是因为:
-
队列实际上是在建立与代理的连接时声明的。 这是在 bean 创建并连接在一起很久之后。 使用队列的 bean 需要知道它的名称。 事实上,在应用程序启动时,代理甚至可能没有运行。
-
如果由于某种原因丢失了与 broker 的连接,管理员将使用相同的名称重新声明 。 如果我们使用代理声明的队列,队列名称将更改。
AnonymousQueue
您可以控制实例使用的队列名称的格式。AnonymousQueue
默认情况下,队列名称的前缀为 ,后跟 base64 表示形式 — 例如:。spring.gen-
UUID
spring.gen-MRBv9sqISkuCiPfOYfpo4g
您可以在 constructor 参数中提供实现。
以下示例显示了如何执行此操作:AnonymousQueue.NamingStrategy
@Bean
public Queue anon1() {
return new AnonymousQueue();
}
@Bean
public Queue anon2() {
return new AnonymousQueue(new AnonymousQueue.Base64UrlNamingStrategy("something-"));
}
@Bean
public Queue anon3() {
return new AnonymousQueue(AnonymousQueue.UUIDNamingStrategy.DEFAULT);
}
第一个 bean 生成一个队列名称,前缀为 — for
例:。
第二个 Bean 生成一个队列名称,前缀为 ,后跟 的 base64 表示形式。
第三个 bean 仅使用 UUID(无 base64 转换)生成名称 — 例如,.spring.gen-
UUID
spring.gen-MRBv9sqISkuCiPfOYfpo4g
something-
UUID
f20c818a-006b-4416-bf91-643590fedb0e
base64 编码使用 RFC 4648 中的“URL 和文件名安全字母表”。
尾随填充字符 () 将被删除。=
您可以提供自己的命名策略,从而可以在队列名称中包含其他信息(例如应用程序名称或客户端主机)。
您可以在使用 XML 配置时指定命名策略。
该属性存在于元素上
对于实现 .
以下示例说明如何以各种方式指定命名策略:naming-strategy
<rabbit:queue>
AnonymousQueue.NamingStrategy
<rabbit:queue id="uuidAnon" />
<rabbit:queue id="springAnon" naming-strategy="uuidNamer" />
<rabbit:queue id="customAnon" naming-strategy="customNamer" />
<bean id="uuidNamer" class="org.springframework.amqp.core.AnonymousQueue.UUIDNamingStrategy" />
<bean id="customNamer" class="org.springframework.amqp.core.AnonymousQueue.Base64UrlNamingStrategy">
<constructor-arg value="custom.gen-" />
</bean>
第一个示例创建名称,例如 .
第二个示例使用 UUID 的 String 表示形式创建名称。
第三个示例创建名称,例如 .spring.gen-MRBv9sqISkuCiPfOYfpo4g
custom.gen-MRBv9sqISkuCiPfOYfpo4g
您还可以提供自己的命名策略 bean。
从版本 2.1 开始,匿名队列的声明参数默认设置为 。
这可确保在应用程序连接到的节点上声明队列。
您可以通过在构造实例后调用 来恢复到之前的行为。Queue.X_QUEUE_LEADER_LOCATOR
client-local
queue.setLeaderLocator(null)
恢复自动删除声明
通常,这些 (s) 仅恢复在应用程序上下文中声明为 bean 的队列/交换/绑定;如果任何此类声明是自动删除的,则当连接丢失时,代理将删除这些声明。
重新建立连接后,管理员将重新声明实体。
通常,通过调用 创建的实体 不会被恢复。RabbitAdmin
admin.declareQueue(…)
admin.declareExchange(…)
admin.declareBinding(…)
从版本 2.4 开始,admin 有一个新属性;when 时,管理员将恢复这些实体以及应用程序上下文中的 bean。redeclareManualDeclarations
true
如果调用 , 或 ,则不会执行单个声明的恢复。
删除队列和交换时,将从可恢复实体中删除关联的绑定。deleteQueue(…)
deleteExchange(…)
removeBinding(…)
最后,调用将阻止恢复任何以前声明的实体。resetAllManualDeclarations()