此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring AMQP 3.2.0! |
发送消息
发送消息时,您可以使用以下任一方法:
void send(Message message) throws AmqpException;
void send(String routingKey, Message message) throws AmqpException;
void send(String exchange, String routingKey, Message message) throws AmqpException;
我们可以从前面清单中的最后一个方法开始讨论,因为它实际上是最明确的。
它允许在运行时提供 AMQP 交换名称(以及路由密钥)。
最后一个参数是负责实际创建 message 实例的回调。
使用此方法发送消息的示例可能如下所示:
以下示例演示如何使用该方法发送消息:send
amqpTemplate.send("marketData.topic", "quotes.nasdaq.THING1",
new Message("12.34".getBytes(), someProperties));
如果您计划在大部分或全部时间使用该模板实例发送到同一 exchange,则可以在模板本身上设置该属性。
在这种情况下,您可以使用前面清单中的第二种方法。
以下示例在功能上等同于前面的示例:exchange
amqpTemplate.setExchange("marketData.topic");
amqpTemplate.send("quotes.nasdaq.FOO", new Message("12.34".getBytes(), someProperties));
如果在模板上同时设置了 和 属性,则可以使用仅接受 .
以下示例显示了如何执行此操作:exchange
routingKey
Message
amqpTemplate.setExchange("marketData.topic");
amqpTemplate.setRoutingKey("quotes.nasdaq.FOO");
amqpTemplate.send(new Message("12.34".getBytes(), someProperties));
考虑 exchange 和 routing key 属性的更好方法是显式方法参数始终覆盖模板的默认值。
事实上,即使您没有在模板上显式设置这些属性,也始终存在默认值。
在这两种情况下,默认值都是空的,但这实际上是一个明智的默认值。
就路由密钥而言,它并不总是必要的(例如,对于
一个交易所)。
此外,队列可以绑定到具有空 .
这些都是依赖模板的路由键属性的默认空值的合法方案。
就 exchange 名称而言,通常使用空,因为 AMQP 规范将 “default exchange” 定义为没有名称。
由于所有队列都自动绑定到该默认交换(这是直接交换),因此使用它们的名称作为绑定值,因此前面列表中的第二种方法可用于通过默认交换向任何队列进行简单的点对点消息传递。
您可以通过在运行时提供 method 参数来提供队列名称作为 。
以下示例显示了如何执行此操作:String
Fanout
String
String
String
routingKey
RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.send("queue.helloWorld", new Message("Hello World".getBytes(), someProperties));
或者,您可以创建一个模板,该模板可用于主要或专门发布到单个 Queue。 以下示例显示了如何执行此操作:
RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.setRoutingKey("queue.helloWorld"); // but we'll always send to this Queue
template.send(new Message("Hello World".getBytes(), someProperties));
消息生成器 API
从版本 1.3 开始,消息生成器 API 由 和 提供。
这些方法提供了一种方便的“Fluent”方法来创建消息或消息属性。
以下示例显示了 Fluent API 的运行情况:MessageBuilder
MessagePropertiesBuilder
Message message = MessageBuilder.withBody("foo".getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("123")
.setHeader("bar", "baz")
.build();
MessageProperties props = MessagePropertiesBuilder.newInstance()
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("123")
.setHeader("bar", "baz")
.build();
Message message = MessageBuilder.withBody("foo".getBytes())
.andProperties(props)
.build();
可以设置在 MessageProperties
上定义的每个属性。
其他方法包括 、 、 和 。
每个属性设置方法都有一个变体。
如果存在默认初始值,则该方法名为 。setHeader(String key, String value)
removeHeader(String key)
removeHeaders()
copyProperties(MessageProperties properties)
set*IfAbsent()
set*IfAbsentOrDefault()
提供了五种静态方法来创建初始消息生成器:
public static MessageBuilder withBody(byte[] body) (1)
public static MessageBuilder withClonedBody(byte[] body) (2)
public static MessageBuilder withBody(byte[] body, int from, int to) (3)
public static MessageBuilder fromMessage(Message message) (4)
public static MessageBuilder fromClonedMessage(Message message) (5)
1 | 生成器创建的消息具有一个正文,该正文是对参数的直接引用。 |
2 | 生成器创建的消息具有一个 body,该 body 是一个新数组,其中包含参数中的 bytes 副本。 |
3 | 生成器创建的消息具有一个 body,该 body 是一个新数组,其中包含参数中的字节范围。
有关更多详细信息,请参阅 Arrays.copyOfRange() 。 |
4 | 生成器创建的消息具有一个 body,该 body 是对参数 body 的直接引用。
参数的属性将复制到新对象。MessageProperties |
5 | 生成器创建的消息具有一个 body,该 body 是一个包含参数 body 副本的新数组。
参数的属性将复制到新对象。MessageProperties |
创建实例时,提供了三种静态方法:MessagePropertiesBuilder
public static MessagePropertiesBuilder newInstance() (1)
public static MessagePropertiesBuilder fromProperties(MessageProperties properties) (2)
public static MessagePropertiesBuilder fromClonedProperties(MessageProperties properties) (3)
1 | 使用默认值初始化新的 message properties 对象。 |
2 | 构建器使用提供的 properties 对象进行初始化,并将返回该对象。build() |
3 | 参数的属性将复制到新对象。MessageProperties |
通过 的实现,每个方法都有一个重载版本,该版本采用一个附加对象。
当 publisher 确认启用时,此对象将在 AmqpTemplate
中描述的回调中返回。
这允许发件人将确认 ( 或 ) 与发送的消息相关联。RabbitTemplate
AmqpTemplate
send()
CorrelationData
ack
nack
从版本 1.6.7 开始,引入了该接口,允许在转换消息后修改关联数据。
以下示例演示如何使用它:CorrelationAwareMessagePostProcessor
Message postProcessMessage(Message message, Correlation correlation);
在版本 2.0 中,此接口已弃用。
该方法已移至 默认实现,该实现委托给 .MessagePostProcessor
postProcessMessage(Message message)
同样从版本 1.6.7 开始,提供了一个名为 的新回调接口。
这是在所有实例之后调用的(在 method 中提供以及 中提供的实例)。
实现可以更新或替换方法中提供的相关数据(如果有)。
和 original (如果有) 作为参数提供。
以下示例演示如何使用该方法:CorrelationDataPostProcessor
MessagePostProcessor
send()
setBeforePublishPostProcessors()
send()
Message
CorrelationData
postProcess
CorrelationData postProcess(Message message, CorrelationData correlationData);
发布者返回
当模板的 property 为 时,返回的消息由 AmqpTemplate
中描述的回调提供。mandatory
true
从版本 1.4 开始,支持 SPEL 属性,该属性作为根评估对象针对每个请求消息进行评估,并解析为一个值。
可以在表达式中使用 Bean 引用(如 )。RabbitTemplate
mandatoryExpression
boolean
@myBean.isMandatory(#root)
发布者返回也可以由 in send 和 receive 操作在内部使用。
有关更多信息,请参阅 Reply Timeout (回复超时)。RabbitTemplate
配料
版本 1.4.2 引入了 .
这是一个子类 ,具有一个重写的方法,该方法根据 .
只有当批处理完成时,才会将消息发送到 RabbitMQ。
下面的清单显示了接口定义:BatchingRabbitTemplate
RabbitTemplate
send
BatchingStrategy
BatchingStrategy
public interface BatchingStrategy {
MessageBatch addToBatch(String exchange, String routingKey, Message message);
Date nextRelease();
Collection<MessageBatch> releaseBatches();
}
批处理数据保存在内存中。 如果系统发生故障,未发送的消息可能会丢失。 |
提供 A。
它支持将消息发送到单个 exchange 或 routing key。
它具有以下属性:SimpleBatchingStrategy
-
batchSize
:在发送批次之前,批次中的消息数。 -
bufferLimit
:批处理消息的最大大小。 如果超出,这将抢占 ,并导致发送部分批处理。batchSize
-
timeout
:当没有新活动向批次添加消息时,将发送部分批次的时间。
该命令通过在每条嵌入消息前面使用四字节的二进制长度来格式化批处理。
通过将 message 属性设置为 ,将此消息传达给接收系统。SimpleBatchingStrategy
springBatchFormat
lengthHeader4
默认情况下,批处理的消息由侦听器容器自动取消批处理(通过使用消息标头)。
拒绝来自批处理的任何消息将导致整个批处理被拒绝。springBatchFormat |
但是,有关更多信息,请参阅使用 Batching @RabbitListener。