AMQP 支持
AMQP (RabbitMQ) 支持
Spring 集成通过使用高级消息队列协议 (AMQP) 提供用于接收和发送消息的通道适配器。
您需要将此依赖项包含在您的项目中:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-amqp</artifactId>
<version>6.1.9</version>
</dependency>
compile "org.springframework.integration:spring-integration-amqp:6.1.9"
以下适配器可用:
Spring 集成还提供了一个点对点的消息通道和一个由 AMQP 交换和队列支持的发布-订阅消息通道。
为了提供 AMQP 支持, Spring 集成依赖于 (Spring AMQP),它将核心 Spring 概念应用于基于 AMQP 的消息传递解决方案的开发。 Spring AMQP 提供与 (Spring JMS) 类似的语义。
虽然提供的 AMQP 通道适配器仅用于单向消息传递(发送或接收),但 Spring 集成还为请求-回复操作提供了入站和出站 AMQP 网关。
提示: 您应该熟悉 Spring AMQP 项目的参考文档。 它提供了有关 Spring 与 AMQP 的集成(特别是 RabbitMQ)的更深入的信息。
入站通道适配器
下面的清单显示了 AMQP 入站通道适配器的可能配置选项:
@Bean
public IntegrationFlow amqpInbound(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(Amqp.inboundAdapter(connectionFactory, "aName"))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
@Bean
public MessageChannel amqpInputChannel() {
return new DirectChannel();
}
@Bean
public AmqpInboundChannelAdapter inbound(SimpleMessageListenerContainer listenerContainer,
@Qualifier("amqpInputChannel") MessageChannel channel) {
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
adapter.setOutputChannel(channel);
return adapter;
}
@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames("aName");
container.setConcurrentConsumers(2);
// ...
return container;
}
@Bean
@ServiceActivator(inputChannel = "amqpInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
<int-amqp:inbound-channel-adapter
id="inboundAmqp" (1)
channel="inboundChannel" (2)
queue-names="si.test.queue" (3)
acknowledge-mode="AUTO" (4)
advice-chain="" (5)
channel-transacted="" (6)
concurrent-consumers="" (7)
connection-factory="" (8)
error-channel="" (9)
expose-listener-channel="" (10)
header-mapper="" (11)
mapped-request-headers="" (12)
listener-container="" (13)
message-converter="" (14)
message-properties-converter="" (15)
phase="" (16)
prefetch-count="" (17)
receive-timeout="" (18)
recovery-interval="" (19)
missing-queues-fatal="" (20)
shutdown-timeout="" (21)
task-executor="" (22)
transaction-attribute="" (23)
transaction-manager="" (24)
tx-size="" (25)
consumers-per-queue (26)
batch-mode="MESSAGES"/> (27)
<1> The unique ID for this adapter.
Optional.
<2> Message channel to which converted messages should be sent.
Required.
<3> Names of the AMQP queues (comma-separated list) from which messages should be consumed.
Required.
<4> Acknowledge mode for the `MessageListenerContainer`.
When set to `MANUAL`, the delivery tag and channel are provided in message headers `amqp_deliveryTag` and `amqp_channel`, respectively.
The user application is responsible for acknowledgement.
`NONE` means no acknowledgements (`autoAck`).
`AUTO` means the adapter's container acknowledges when the downstream flow completes.
Optional (defaults to AUTO).
See <<amqp-inbound-ack>>.
<5> Extra AOP Advices to handle cross-cutting behavior associated with this inbound channel adapter.
Optional.
<6> Flag to indicate that channels created by this component are transactional.
If true, it tells the framework to use a transactional channel and to end all operations (send or receive) with a commit or rollback, depending on the outcome, with an exception that signals a rollback.
Optional (Defaults to false).
<7> Specify the number of concurrent consumers to create.
The default is `1`.
We recommend raising the number of concurrent consumers to scale the consumption of messages coming in from a queue.
However, note that any ordering guarantees are lost once multiple consumers are registered.
In general, use one consumer for low-volume queues.
Not allowed when 'consumers-per-queue' is set.
Optional.
<8> Bean reference to the RabbitMQ `ConnectionFactory`.
Optional (defaults to `connectionFactory`).
<9> Message channel to which error messages should be sent.
Optional.
<10> Whether the listener channel (com.rabbitmq.client.Channel) is exposed to a registered `ChannelAwareMessageListener`.
Optional (defaults to true).
<11> A reference to an `AmqpHeaderMapper` to use when receiving AMQP Messages.
Optional.
By default, only standard AMQP properties (such as `contentType`) are copied to Spring Integration `MessageHeaders`.
Any user-defined headers within the AMQP `MessageProperties` are NOT copied to the message by the default `DefaultAmqpHeaderMapper`.
Not allowed if 'request-header-names' is provided.
<12> Comma-separated list of the names of AMQP Headers to be mapped from the AMQP request into the `MessageHeaders`.
This can only be provided if the 'header-mapper' reference is not provided.
The values in this list can also be simple patterns to be matched against the header names (such as "\*" or "thing1*, thing2" or "*something").
<13> Reference to the `AbstractMessageListenerContainer` to use for receiving AMQP Messages.
If this attribute is provided, no other attribute related to the listener container configuration should be provided.
In other words, by setting this reference, you must take full responsibility for the listener container configuration.
The only exception is the `MessageListener` itself.
Since that is actually the core responsibility of this channel adapter implementation, the referenced listener container must not already have its own `MessageListener`.
Optional.
<14> The `MessageConverter` to use when receiving AMQP messages.
Optional.
<15> The `MessagePropertiesConverter` to use when receiving AMQP messages.
Optional.
<16> Specifies the phase in which the underlying `AbstractMessageListenerContainer` should be started and stopped.
The startup order proceeds from lowest to highest, and the shutdown order is the reverse of that.
By default, this value is `Integer.MAX_VALUE`, meaning that this container starts as late as possible and stops as soon as possible.
Optional.
<17> Tells the AMQP broker how many messages to send to each consumer in a single request.
Often, you can set this value high to improve throughput.
It should be greater than or equal to the transaction size (see the `tx-size` attribute, later in this list).
Optional (defaults to `1`).
<18> Receive timeout in milliseconds.
Optional (defaults to `1000`).
<19> Specifies the interval between recovery attempts of the underlying `AbstractMessageListenerContainer` (in milliseconds).
Optional (defaults to `5000`).
<20> If 'true' and none of the queues are available on the broker, the container throws a fatal exception during startup and stops if the queues are deleted when the container is running (after making three attempts to passively declare the queues).
If `false`, the container does not throw an exception and goes into recovery mode, attempting to restart according to the `recovery-interval`.
Optional (defaults to `true`).
<21> The time to wait for workers (in milliseconds) after the underlying `AbstractMessageListenerContainer` is stopped and before the AMQP connection is forced closed.
If any workers are active when the shutdown signal comes, they are allowed to finish processing as long as they can finish within this timeout.
Otherwise, the connection is closed and messages remain unacknowledged (if the channel is transactional).
Optional (defaults to `5000`).
<22> By default, the underlying `AbstractMessageListenerContainer` uses a `SimpleAsyncTaskExecutor` implementation, that fires up a new thread for each task, running it asynchronously.
By default, the number of concurrent threads is unlimited.
Note that this implementation does not reuse threads.
Consider using a thread-pooling `TaskExecutor` implementation as an alternative.
Optional (defaults to `SimpleAsyncTaskExecutor`).
<23> By default, the underlying `AbstractMessageListenerContainer` creates a new instance of the `DefaultTransactionAttribute` (it takes the EJB approach to rolling back on runtime but not checked exceptions).
Optional (defaults to `DefaultTransactionAttribute`).
<24> Sets a bean reference to an external `PlatformTransactionManager` on the underlying `AbstractMessageListenerContainer`.
The transaction manager works in conjunction with the `channel-transacted` attribute.
If there is already a transaction in progress when the framework is sending or receiving a message and the `channelTransacted` flag is `true`, the commit or rollback of the messaging transaction is deferred until the end of the current transaction.
If the `channelTransacted` flag is `false`, no transaction semantics apply to the messaging operation (it is auto-acked).
For further information, see
https://docs.spring.io/spring-amqp/reference/html/%255Freference.html#%5Ftransactions[Transactions with Spring AMQP].
Optional.
<25> Tells the `SimpleMessageListenerContainer` how many messages to process in a single transaction (if the channel is transactional).
For best results, it should be less than or equal to the value set in `prefetch-count`.
Not allowed when 'consumers-per-queue' is set.
Optional (defaults to `1`).
<26> Indicates that the underlying listener container should be a `DirectMessageListenerContainer` instead of the default `SimpleMessageListenerContainer`.
See the https://docs.spring.io/spring-amqp/reference/html/[Spring AMQP Reference Manual] for more information.
<27> When the container's `consumerBatchEnabled` is `true`, determines how the adapter presents the batch of messages in the message payload.
When set to `MESSAGES` (default), the payload is a `List<Message<?>>` where each message has headers mapped from the incoming AMQP `Message` and the payload is the converted `body`.
When set to `EXTRACT_PAYLOADS`, the payload is a `List<?>` where the elements are converted from the AMQP `Message` body.
`EXTRACT_PAYLOADS_WITH_HEADERS` is similar to `EXTRACT_PAYLOADS` but, in addition, the headers from each message are mapped from the `MessageProperties` into a `List<Map<String, Object>` at the corresponding index; the header name is `AmqpInboundChannelAdapter.CONSOLIDATED_HEADERS`.
容器
请注意,使用 XML 配置外部容器时,不能使用 Spring AMQP 名称空间来定义容器。
这是因为命名空间至少需要一个元素。
在此环境中,侦听器是适配器的内部。
因此,您必须使用普通的 Spring 定义来定义容器,如下例所示:
|
尽管 Spring 集成 JMS 和 AMQP 支持相似,但存在重要差异。
JMS 入站通道适配器在幕后使用,并期望配置一个 Poller。
AMQP 入站通道适配器使用 和 是消息驱动的。
在这方面,它更类似于 JMS 消息驱动的通道适配器。JmsDestinationPollingSource AbstractMessageListenerContainer |
从版本 5.5 开始,可以使用在内部调用重试操作时使用的策略进行配置。
有关更多信息,请参阅 JavaDocs。AmqpInboundChannelAdapter
org.springframework.amqp.rabbit.retry.MessageRecoverer
RecoveryCallback
setMessageRecoverer()
该注释还可以与 :@Publisher
@RabbitListener
@Configuration
@EnableIntegration
@EnableRabbit
@EnablePublisher
public static class ContextConfiguration {
@Bean
QueueChannel fromRabbitViaPublisher() {
return new QueueChannel();
}
@RabbitListener(queuesToDeclare = @Queue("publisherQueue"))
@Publisher("fromRabbitViaPublisher")
@Payload("#args.payload.toUpperCase()")
public void consumeForPublisher(String payload) {
}
}
默认情况下,AOP 拦截器处理方法调用的返回值。
但是,方法的返回值被视为 AMQP 回复消息。
因此,这种方法不能与a一起使用,因此对于这种组合,推荐使用针对方法参数的相应 SPEL 表达式的 Comments。
请参阅 Annotation-driven Configuration 部分中的 详细信息。@Publisher
@RabbitListener
@Publisher
@Payload
@Publisher
在侦听器容器中使用独占使用方或单活动使用方时,建议将 container 属性设置为 。
这将防止出现争用情况,即在停止容器后,另一个使用者可以在此实例完全停止之前开始使用消息。forceStop true |
批量消息
有关批处理消息的更多信息,请参阅 Spring AMQP 文档。
要使用 Spring Integration 生成批处理消息,只需使用 .BatchingRabbitTemplate
在接收批处理消息时,默认情况下,侦听器容器会提取每个片段消息,适配器将为每个片段生成一个。
从版本 5.2 开始,如果容器的 property 设置为 ,则取消批处理由适配器执行,并生成一个 single,其中有效负载是片段有效负载的列表(如果合适,则在转换后)。Message<?>
deBatchingEnabled
false
Message<List<?>>
默认值为 ,但可以在适配器上覆盖。BatchingStrategy
SimpleBatchingStrategy
当重试操作需要恢复时,必须与批处理一起使用。org.springframework.amqp.rabbit.retry.MessageBatchRecoverer |
轮询的入站通道适配器
概述
版本 5.0.1 引入了轮询通道适配器,允许您按需获取单个消息 - 例如,使用 a 或 poller。
有关更多信息,请参阅 Deferred Acknowledgment Pollable Message Source (延迟确认轮询消息源) 以了解更多信息。MessageSourcePollingTemplate
它当前不支持 XML 配置。
以下示例说明如何配置 :AmqpMessageSource
@Bean
public IntegrationFlow flow() {
return IntegrationFlow.from(Amqp.inboundPolledAdapter(connectionFactory(), DSL_QUEUE),
e -> e.poller(Pollers.fixedDelay(1_000)).autoStartup(false))
.handle(p -> {
...
})
.get();
}
@Bean
public AmqpMessageSource source(ConnectionFactory connectionFactory) {
return new AmqpMessageSource(connectionFactory, "someQueue");
}
有关配置属性,请参阅 Javadoc。
This adapter currently does not have XML configuration support.
入站网关
入站网关支持入站通道适配器上的所有属性(除了 'channel' 被 'request-channel' 替换),以及一些其他属性。 以下清单显示了可用的属性:
@Bean // return the upper cased payload
public IntegrationFlow amqpInboundGateway(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(Amqp.inboundGateway(connectionFactory, "foo"))
.transform(String.class, String::toUpperCase)
.get();
}
@Bean
public MessageChannel amqpInputChannel() {
return new DirectChannel();
}
@Bean
public AmqpInboundGateway inbound(SimpleMessageListenerContainer listenerContainer,
@Qualifier("amqpInputChannel") MessageChannel channel) {
AmqpInboundGateway gateway = new AmqpInboundGateway(listenerContainer);
gateway.setRequestChannel(channel);
gateway.setDefaultReplyTo("bar");
return gateway;
}
@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames("foo");
container.setConcurrentConsumers(2);
// ...
return container;
}
@Bean
@ServiceActivator(inputChannel = "amqpInputChannel")
public MessageHandler handler() {
return new AbstractReplyProducingMessageHandler() {
@Override
protected Object handleRequestMessage(Message<?> requestMessage) {
return "reply to " + requestMessage.getPayload();
}
};
}
<int-amqp:inbound-gateway
id="inboundGateway" (1)
request-channel="myRequestChannel" (2)
header-mapper="" (3)
mapped-request-headers="" (4)
mapped-reply-headers="" (5)
reply-channel="myReplyChannel" (6)
reply-timeout="1000" (7)
amqp-template="" (8)
default-reply-to="" /> (9)
1 | 此适配器的唯一 ID。 自选。 |
2 | 将转换后的消息发送到的消息通道。 必填。 |
3 | 对接收 AMQP 消息时使用的引用。
自选。
默认情况下,只有标准的 AMQP 属性(例如 )被复制到 Spring Integration 或从 Spring Integration 复制。
默认情况下,AMQP 中的任何用户定义的标头都不会复制到 AMQP 消息或从 AMQP 消息复制。
如果提供了 'request-header-names' 或 'reply-header-names',则不允许。AmqpHeaderMapper contentType MessageHeaders MessageProperties DefaultAmqpHeaderMapper |
4 | 要从 AMQP 请求映射到 .
仅当未提供 'header-mapper' 引用时,才能提供此属性。
此列表中的值也可以是与 Headers 名称匹配的简单模式(例如 或 或 )。MessageHeaders "*" "thing1*, thing2" "*thing1" |
5 | 要映射到 AMQP 回复消息的 AMQP 消息属性的名称列表的逗号分隔。
所有标准 Headers(例如 )都映射到 AMQP 消息属性,而用户定义的 Headers 映射到 'headers' 属性。
只有在未提供 'header-mapper' 引用时才能提供此属性。
此列表中的值也可以是要与报头名称匹配的简单模式(例如, or or or )。MessageHeaders contentType "*" "foo*, bar" "*foo" |
6 | 需要回复 Messages 的消息通道。 自选。 |
7 | 设置用于从 reply channel 接收消息的底层。
如果未指定,则此属性默认为 (1 秒)。
仅当容器线程在发送回复之前移交给另一个线程时适用。receiveTimeout o.s.i.core.MessagingTemplate 1000 |
8 | 自定义的 bean 引用(以便更好地控制要发送的回复消息)。
您可以提供 .AmqpTemplate RabbitTemplate |
9 | 当 没有属性时使用。
如果未指定此选项,则提供 no,请求消息中不存在任何属性,并且
an 被抛出,因为无法路由回复。
如果未指定此选项并且提供了 external,则不会引发异常。
您必须指定此选项或配置默认 and 在该模板上,
如果您预见到请求消息中不存在任何属性的情况。replyTo o.s.amqp.core.Address requestMessage replyTo amqp-template replyTo IllegalStateException amqp-template exchange routingKey replyTo |
请参阅 Inbound Channel Adapter 中有关配置属性的说明。listener-container
从版本 5.5 开始,可以使用在内部调用重试操作时使用的策略进行配置。
有关更多信息,请参阅 JavaDocs。AmqpInboundChannelAdapter
org.springframework.amqp.rabbit.retry.MessageRecoverer
RecoveryCallback
setMessageRecoverer()
批量消息
请参阅 Batched Messages (批量消息)。
入站终端节点确认模式
默认情况下,入站端点使用确认模式,这意味着容器在下游集成流完成时自动确认消息(或使用 或 将消息传递给另一个线程)。
将模式设置为配置使用者,以便根本不使用确认(代理在发送消息后立即自动确认消息)。
将 mode 设置为 可让用户代码在处理过程中的其他时间点确认消息。
为了支持这一点,在此模式下,终端节点分别在 和 标头中提供 and。AUTO
QueueChannel
ExecutorChannel
NONE
MANUAL
Channel
deliveryTag
amqp_channel
amqp_deliveryTag
您可以在 but、通常仅使用 和 (或 ) 上执行任何有效的 Rabbit 命令。
为了不干扰容器的操作,您不应保留对通道的引用,而只能在当前消息的上下文中使用它。Channel
basicAck
basicNack
basicReject
由于 this 是对 “live” 对象的引用,因此它无法序列化,如果持久保存消息,它将丢失。Channel |
以下示例显示了如何使用 acknowledgement:MANUAL
@ServiceActivator(inputChannel = "foo", outputChannel = "bar")
public Object handle(@Payload String payload, @Header(AmqpHeaders.CHANNEL) Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag) throws Exception {
// Do some processing
if (allOK) {
channel.basicAck(deliveryTag, false);
// perhaps do some more processing
}
else {
channel.basicNack(deliveryTag, false, true);
}
return someResultForDownStreamProcessing;
}
出站终端节点
以下出站终端节点具有许多类似的配置选项。
从版本 5.2 开始,已添加 。
通常,当 publisher 确认启用时,broker 将快速返回一个 ack(或 nack),该 ack 将被发送到相应的通道。
如果在收到确认之前关闭了通道,则 Spring AMQP 框架将合成一个 nack。
“缺失”ack 不应发生,但是,如果您设置了此属性,则端点将定期检查它们,并在时间过后未收到确认时合成 nack。confirm-timeout
出站通道适配器
以下示例显示了 AMQP 出站通道适配器的可用属性:
@Bean
public IntegrationFlow amqpOutbound(AmqpTemplate amqpTemplate,
MessageChannel amqpOutboundChannel) {
return IntegrationFlow.from(amqpOutboundChannel)
.handle(Amqp.outboundAdapter(amqpTemplate)
.routingKey("queue1")) // default exchange - route to queue 'queue1'
.get();
}
@Bean
@ServiceActivator(inputChannel = "amqpOutboundChannel")
public AmqpOutboundEndpoint amqpOutbound(AmqpTemplate amqpTemplate) {
AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(amqpTemplate);
outbound.setRoutingKey("queue1"); // default exchange - route to queue 'queue1'
return outbound;
}
@Bean
public MessageChannel amqpOutboundChannel() {
return new DirectChannel();
}
<int-amqp:outbound-channel-adapter id="outboundAmqp" (1)
channel="outboundChannel" (2)
amqp-template="myAmqpTemplate" (3)
exchange-name="" (4)
exchange-name-expression="" (5)
order="1" (6)
routing-key="" (7)
routing-key-expression="" (8)
default-delivery-mode"" (9)
confirm-correlation-expression="" (10)
confirm-ack-channel="" (11)
confirm-nack-channel="" (12)
confirm-timeout="" (13)
wait-for-confirm="" (14)
return-channel="" (15)
error-message-strategy="" (16)
header-mapper="" (17)
mapped-request-headers="" (18)
lazy-connect="true" (19)
multi-send="false"/> (20)
1 | 此适配器的唯一 ID。 自选。 |
2 | 消息通道,应将消息发送到该通道,以便将消息转换并发布到 AMQP 交换。 必填。 |
3 | 对已配置的 AMQP 模板的 Bean 引用。
可选 (默认为 )。amqpTemplate |
4 | 消息发送到的 AMQP 交换的名称。 如果未提供,则消息将发送到默认的 no-name 交换。 与 'exchange-name-expression' 互斥。 自选。 |
5 | 一个 SPEL 表达式,用于确定消息发送到的 AMQP 交换的名称,并将消息作为根对象。 如果未提供,则消息将发送到默认的 no-name 交换。 与 'exchange-name' 互斥。 自选。 |
6 | 注册多个使用者时此使用者的顺序,从而启用负载均衡和故障转移。
可选 (默认为 )。Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE] |
7 | 发送消息时使用的固定 routing-key。
默认情况下,这是一个空的 .
与 'routing-key-expression' 互斥。
自选。String |
8 | 一个 SPEL 表达式,用于确定发送消息时要使用的路由键,并将消息作为根对象(例如,'payload.key')。
默认情况下,这是一个空的 .
与 'routing-key' 互斥。
自选。String |
9 | 邮件的默认传递模式:或 .
如果设置了传递模式,则被覆盖。
如果存在 Spring Integration 消息标头,则设置值。
如果未提供此属性并且标头映射器未设置它,则默认值取决于 .
如果根本没有自定义,则默认值为 。
自选。PERSISTENT NON_PERSISTENT header-mapper amqp_deliveryMode DefaultHeaderMapper MessagePropertiesConverter RabbitTemplate PERSISTENT |
10 | 定义相关数据的表达式。
如果提供,这会将底层 AMQP 模板配置为接收发布者确认。
需要 dedicated 和 a,并将属性设置为 。
收到发布者确认并提供关联数据后,会将其写入 或 ,具体取决于确认类型。
确认的有效负载是相关数据,如此表达式所定义。
邮件的 'amqp_publishConfirm' 报头设置为 () 或 ()。
示例:和 .
版本 4.1 引入了消息标头。
它包含用于发布者确认的 'nack'。
从版本 4.2 开始,如果表达式解析为实例(例如 ),则在 / 通道上发出的消息基于该消息,并添加了其他 Headers。
以前,无论类型如何,都会使用关联数据作为其负载创建新消息。
另请参阅发布者确认和返回的替代机制。
自选。RabbitTemplate CachingConnectionFactory publisherConfirms true confirm-ack-channel confirm-nack-channel true ack false nack headers['myCorrelationData'] payload amqp_publishConfirmNackCause cause Message<?> #this ack nack |
11 | positive () 发布者确认发送到的渠道。
有效负载是由 .
如果表达式为 或 ,则消息是从原始消息构建的,标题设置为 。
另请参阅发布者确认和返回的替代机制。
可选(默认值为 )。ack confirm-correlation-expression #root #this amqp_publishConfirm true nullChannel |
12 | 否定 () 发布者确认发送到的渠道。
payload 是由 定义的关联数据(如果未配置)。
如果表达式为 或 ,则消息是从原始消息构建的,标题设置为 。
当存在 时,消息是具有有效负载的 an。
另请参阅发布者确认和返回的替代机制。
可选(默认值为 )。nack confirm-correlation-expression ErrorMessageStrategy #root #this amqp_publishConfirm false ErrorMessageStrategy ErrorMessage NackedAmqpMessageException nullChannel |
13 | 设置后,如果在此时间(以毫秒为单位)内未收到发布者确认,则适配器将合成否定确认 (nack)。 待处理确认每检查一次此值的 50%,因此发送 nack 的实际时间将介于此值的 1 倍到 1.5 倍之间。 另请参阅发布者确认和返回的替代机制。 默认无(不会生成 nack)。 |
14 | 当设置为 true 时,调用线程将阻塞,等待发布者确认。
这需要为 confirm 配置一个 以及 .
线程将阻塞长达 (或默认为 5 秒)。
如果发生超时,将引发 a。
如果启用了返回并返回了一条消息,或者在等待确认时发生任何其他异常,则将引发 a,并显示相应的消息。RabbitTemplate confirm-correlation-expression confirm-timeout MessageTimeoutException MessageHandlingException |
15 | 返回的消息发送到的通道。
如果提供,则底层 AMQP 模板配置为将无法传递的消息返回给适配器。
如果未配置,则根据从 AMQP 接收的数据构建消息,并具有以下附加标头: , , , 。
当存在 时,消息是具有有效负载的 an。
另请参阅发布者确认和返回的替代机制。
自选。ErrorMessageStrategy amqp_returnReplyCode amqp_returnReplyText amqp_returnExchange amqp_returnRoutingKey ErrorMessageStrategy ErrorMessage ReturnedAmqpMessageException |
16 | 对用于在发送返回或否定确认消息时构建实例的实现的引用。ErrorMessageStrategy ErrorMessage |
17 | 对发送 AMQP 消息时使用的引用。
默认情况下,只有标准的 AMQP 属性(例如 )被复制到 Spring Integration 。
默认的'DefaultAmqpHeaderMapper'不会将任何用户定义的 Headers 复制到消息中。
如果提供了 'request-header-names',则不允许。
自选。AmqpHeaderMapper contentType MessageHeaders |
18 | 要从 映射到 AMQP 消息的 AMQP 标头名称的逗号分隔列表。
如果提供了 'header-mapper' 引用,则不允许。
此列表中的值也可以是与 Headers 名称匹配的简单模式(例如 或 或 )。MessageHeaders "*" "thing1*, thing2" "*thing1" |
19 | 设置为 时,端点将在应用程序上下文初始化期间尝试连接到代理。
这允许对错误配置进行 “fail fast” 检测,但如果 broker 宕机,也会导致初始化失败。
When (默认值),在发送第一条消息时建立连接(如果它尚不存在,因为其他组件建立了它)。false true |
20 | 当设置为 时,类型的有效负载将作为离散消息在单次调用范围内的同一通道上发送。
需要 .
When is true,在发送消息后调用。
使用事务模板,将在新事务或已启动的事务(如果存在)中执行发送。true Iterable<Message<?>> RabbitTemplate RabbitTemplate wait-for-confirms RabbitTemplate.waitForConfirmsOrDie() |
return-channel (返回通道)
使用 a 需要 a 且 属性设置为 ,以及 a 且 属性设置为 。
当使用多个带返回的出站终端节点时,每个终端节点都需要一个单独的终端节点。 |
出站网关
以下清单显示了 AMQP 出站网关的可能属性:
@Bean
public IntegrationFlow amqpOutbound(AmqpTemplate amqpTemplate) {
return f -> f.handle(Amqp.outboundGateway(amqpTemplate)
.routingKey("foo")) // default exchange - route to queue 'foo'
.get();
}
@MessagingGateway(defaultRequestChannel = "amqpOutbound.input")
public interface MyGateway {
String sendToRabbit(String data);
}
@Bean
@ServiceActivator(inputChannel = "amqpOutboundChannel")
public AmqpOutboundEndpoint amqpOutbound(AmqpTemplate amqpTemplate) {
AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(amqpTemplate);
outbound.setExpectReply(true);
outbound.setRoutingKey("foo"); // default exchange - route to queue 'foo'
return outbound;
}
@Bean
public MessageChannel amqpOutboundChannel() {
return new DirectChannel();
}
@MessagingGateway(defaultRequestChannel = "amqpOutboundChannel")
public interface MyGateway {
String sendToRabbit(String data);
}
<int-amqp:outbound-gateway id="outboundGateway" (1)
request-channel="myRequestChannel" (2)
amqp-template="" (3)
exchange-name="" (4)
exchange-name-expression="" (5)
order="1" (6)
reply-channel="" (7)
reply-timeout="" (8)
requires-reply="" (9)
routing-key="" (10)
routing-key-expression="" (11)
default-delivery-mode"" (12)
confirm-correlation-expression="" (13)
confirm-ack-channel="" (14)
confirm-nack-channel="" (15)
confirm-timeout="" (16)
return-channel="" (17)
error-message-strategy="" (18)
lazy-connect="true" /> (19)
1 | 此适配器的唯一 ID。 自选。 |
2 | 消息通道,将消息发送到该通道,以便将消息转换并发布到 AMQP 交换。 必填。 |
3 | 对已配置的 AMQP 模板的 Bean 引用。
可选 (默认为 )。amqpTemplate |
4 | 应将消息发送到的 AMQP 交换的名称。 如果未提供,则消息将发送到默认的 no-name cxchange。 与 'exchange-name-expression' 互斥。 自选。 |
5 | 一个 SPEL 表达式,用于确定应将消息发送到的 AMQP 交换的名称,并将消息作为根对象。 如果未提供,则消息将发送到默认的 no-name 交换。 与 'exchange-name' 互斥。 自选。 |
6 | 注册多个使用者时此使用者的顺序,从而启用负载均衡和故障转移。
可选 (默认为 )。Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE] |
7 | 从 AMQP 队列接收回复并进行转换后,应将回复发送到的消息通道。 自选。 |
8 | 网关在向 发送回复消息时等待的时间。
这仅适用于 can 阻止的情况,例如容量限制当前已满的 a。
默认为 infinity。reply-channel reply-channel QueueChannel |
9 | When 时,如果属性中未收到回复消息,则网关将引发异常。
默认为 。true AmqpTemplate’s `replyTimeout true |
10 | 发送消息时使用的。
默认情况下,这是一个空的 .
与 'routing-key-expression' 互斥。
自选。routing-key String |
11 | 一个 SPEL 表达式,用于确定发送消息时使用的表达式,并将消息作为根对象(例如,'payload.key')。
默认情况下,这是一个空的 .
与 'routing-key' 互斥。
自选。routing-key String |
12 | 邮件的默认传递模式:或 .
如果设置了传递模式,则被覆盖。
如果存在 Spring Integration 消息标头,则设置值。
如果未提供此属性并且标头映射器未设置它,则默认值取决于 .
如果根本没有自定义,则默认值为 。
自选。PERSISTENT NON_PERSISTENT header-mapper amqp_deliveryMode DefaultHeaderMapper MessagePropertiesConverter RabbitTemplate PERSISTENT |
13 | 从 4.2 版本开始。
定义关联数据的表达式。
如果提供,这会将底层 AMQP 模板配置为接收发布者确认。
需要 dedicated 和 a,并将属性设置为 。
收到发布者确认并提供关联数据后,会将其写入 或 ,具体取决于确认类型。
确认的有效负载是此表达式定义的关联数据。
邮件的报头 'amqp_publishConfirm' 设置为 () 或 ()。
为了确认, Spring Integration 提供了一个额外的 header 。
示例:和 .
如果表达式解析为实例(例如 ),则消息
/ 通道上发出的 / 基于该消息,并添加了其他标头。
以前,无论类型如何,都会使用关联数据作为其负载创建新消息。
另请参阅发布者确认和返回的替代机制。
自选。RabbitTemplate CachingConnectionFactory publisherConfirms true confirm-ack-channel confirm-nack-channel true ack false nack nack amqp_publishConfirmNackCause headers['myCorrelationData'] payload Message<?> #this ack nack |
14 | 将肯定 () 发布者确认发送到的渠道。
有效载荷是由 定义的相关数据。
如果表达式为 或 ,则消息是从原始消息构建的,标题设置为 。
另请参阅发布者确认和返回的替代机制。
可选(默认值为 )。ack confirm-correlation-expression #root #this amqp_publishConfirm true nullChannel |
15 | 否定 () 发布者确认发送到的渠道。
有效负载是由 (如果未配置) 定义的关联数据。
如果表达式为 或 ,则消息是从原始消息构建的,标题设置为 。
当存在 时,消息是具有有效负载的 an。
另请参阅发布者确认和返回的替代机制。
可选(默认值为 )。nack confirm-correlation-expression ErrorMessageStrategy #root #this amqp_publishConfirm false ErrorMessageStrategy ErrorMessage NackedAmqpMessageException nullChannel |
16 | 设置后,如果在此时间(以毫秒为单位)内未收到发布者确认,网关将合成否定确认 (nack)。 待处理确认每检查一次此值的 50%,因此发送 nack 的实际时间将介于此值的 1 倍到 1.5 倍之间。 默认无(不会生成 nack)。 |
17 | 返回的消息发送到的通道。
如果提供,则底层 AMQP 模板配置为将无法传递的消息返回给适配器。
如果未配置,则根据从 AMQP 接收的数据构建消息,并具有以下附加标头: 、 、 和 。
当存在 时,消息是具有有效负载的 an。
另请参阅发布者确认和返回的替代机制。
自选。ErrorMessageStrategy amqp_returnReplyCode amqp_returnReplyText amqp_returnExchange amqp_returnRoutingKey ErrorMessageStrategy ErrorMessage ReturnedAmqpMessageException |
18 | 对用于在发送返回或否定确认消息时构建实例的实现的引用。ErrorMessageStrategy ErrorMessage |
19 | 设置为 时,端点将在应用程序上下文初始化期间尝试连接到代理。
这允许在代理关闭时通过记录错误消息来“快速失败”检测错误配置。
When (默认值),在发送第一条消息时建立连接(如果它尚不存在,因为其他组件建立了它)。false true |
return-channel (返回通道)
使用 a 需要 a 且 属性设置为 ,以及 a 且 属性设置为 。
当使用多个带返回的出站终端节点时,每个终端节点都需要一个单独的终端节点。 |
底层的默认值为 5 秒。
如果需要更长的超时时间,则必须在 .AmqpTemplate replyTimeout template |
请注意,出站适配器和出站网关配置之间的唯一区别是属性的设置。expectReply
异步出站网关
上一节中讨论的网关是同步的,因为发送线程将暂停,直到
收到回复(或发生超时)。
Spring 集成版本 4.3 添加了一个异步网关,它使用来自 Spring AMQP 的。
发送消息时,线程在发送操作完成后立即返回,收到消息时,将在模板的侦听器容器线程上发送回复。
当在 poller 线程上调用网关时,这可能很有用。
线程已释放,可用于框架中的其他任务。AsyncRabbitTemplate
以下清单显示了 AMQP 异步出站网关的可能配置选项:
@Configuration
public class AmqpAsyncApplication {
@Bean
public IntegrationFlow asyncAmqpOutbound(AsyncRabbitTemplate asyncRabbitTemplate) {
return f -> f
.handle(Amqp.asyncOutboundGateway(asyncRabbitTemplate)
.routingKey("queue1")); // default exchange - route to queue 'queue1'
}
@MessagingGateway(defaultRequestChannel = "asyncAmqpOutbound.input")
public interface MyGateway {
String sendToRabbit(String data);
}
}
@Configuration
public class AmqpAsyncConfig {
@Bean
@ServiceActivator(inputChannel = "amqpOutboundChannel")
public AsyncAmqpOutboundGateway amqpOutbound(AsyncRabbitTemplate asyncTemplate) {
AsyncAmqpOutboundGateway outbound = new AsyncAmqpOutboundGateway(asyncTemplate);
outbound.setRoutingKey("foo"); // default exchange - route to queue 'foo'
return outbound;
}
@Bean
public AsyncRabbitTemplate asyncTemplate(RabbitTemplate rabbitTemplate,
SimpleMessageListenerContainer replyContainer) {
return new AsyncRabbitTemplate(rabbitTemplate, replyContainer);
}
@Bean
public SimpleMessageListenerContainer replyContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(ccf);
container.setQueueNames("asyncRQ1");
return container;
}
@Bean
public MessageChannel amqpOutboundChannel() {
return new DirectChannel();
}
}
<int-amqp:outbound-async-gateway id="asyncOutboundGateway" (1)
request-channel="myRequestChannel" (2)
async-template="" (3)
exchange-name="" (4)
exchange-name-expression="" (5)
order="1" (6)
reply-channel="" (7)
reply-timeout="" (8)
requires-reply="" (9)
routing-key="" (10)
routing-key-expression="" (11)
default-delivery-mode"" (12)
confirm-correlation-expression="" (13)
confirm-ack-channel="" (14)
confirm-nack-channel="" (15)
confirm-timeout="" (16)
return-channel="" (17)
lazy-connect="true" /> (18)
1 | 此适配器的唯一 ID。 自选。 |
2 | 消息通道,消息应发送到该通道,以便将它们转换并发布到 AMQP 交换。 必填。 |
3 | Bean 引用到已配置的 .
可选(默认为 )。AsyncRabbitTemplate asyncRabbitTemplate |
4 | 应将消息发送到的 AMQP 交换的名称。 如果未提供,则消息将发送到默认的 no-name 交换。 与 'exchange-name-expression' 互斥。 自选。 |
5 | 一个 SPEL 表达式,用于确定消息发送到的 AMQP 交换的名称,并将消息作为根对象。 如果未提供,则消息将发送到默认的 no-name 交换。 与 'exchange-name' 互斥。 自选。 |
6 | 注册多个使用者时此使用者的顺序,从而启用负载均衡和故障转移。
可选(默认为 )。Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE] |
7 | 从 AMQP 队列接收回复并进行转换后,应将回复发送到的消息通道。 自选。 |
8 | 网关在向 发送回复消息时等待的时间。
这仅适用于 can 阻止的情况,例如容量限制当前已满的 a。
默认值为 infinity。reply-channel reply-channel QueueChannel |
9 | 当在属性中未收到回复消息且设置为 时,网关会向入站消息的报头发送错误消息。
当在属性中未收到回复消息且此设置为 时,网关会向默认网关发送错误消息(如果可用)。
它默认为 .AsyncRabbitTemplate’s `receiveTimeout true errorChannel AsyncRabbitTemplate’s `receiveTimeout false errorChannel true |
10 | 发送 Messages 时使用的 routing-key。
默认情况下,这是一个空的 .
与 'routing-key-expression' 互斥。
自选。String |
11 | 一个 SPEL 表达式,用于确定发送消息时要使用的路由密钥。
将消息作为根对象(例如,'payload.key')。
默认情况下,这是一个空的 .
与 'routing-key' 互斥。
自选。String |
12 | 邮件的默认传递模式:或 .
如果设置了传递模式,则被覆盖。
如果存在 Spring Integration 消息头 (),则设置值。
如果未提供此属性并且标头映射器未设置它,则默认值取决于 .
如果未自定义,则默认值为 。
自选。PERSISTENT NON_PERSISTENT header-mapper amqp_deliveryMode DefaultHeaderMapper MessagePropertiesConverter RabbitTemplate PERSISTENT |
13 | 定义相关数据的表达式。
如果提供,这会将底层 AMQP 模板配置为接收发布者确认。
需要一个 dedicated 和 a,其属性设置为 。
收到发布者确认并提供关联数据后,确认将写入 或 ,具体取决于确认类型。
确认的有效负载是此表达式定义的关联数据,并且消息的 'amqp_publishConfirm' 标头设置为 () 或 ()。
例如,会提供一个额外的标头 ()。
例子:。
如果表达式解析为实例(如 “#this”),则在 / 通道上发出的消息基于该消息,并添加了其他标头。
另请参阅发布者确认和返回的替代机制。
自选。RabbitTemplate CachingConnectionFactory publisherConfirms true confirm-ack-channel confirm-nack-channel true ack false nack nack amqp_publishConfirmNackCause headers['myCorrelationData'] payload Message<?> ack nack |
14 | 将肯定 () 发布者确认发送到的渠道。
有效负载是由 .
要求基础的 property 设置为 .
另请参阅发布者确认和返回的替代机制。
可选(默认值为 )。ack confirm-correlation-expression AsyncRabbitTemplate enableConfirms true nullChannel |
15 | 从 4.2 版本开始。
否定 () 发布者确认发送到的渠道。
有效负载是由 .
要求基础的 property 设置为 .
另请参阅发布者确认和返回的替代机制。
可选(默认值为 )。nack confirm-correlation-expression AsyncRabbitTemplate enableConfirms true nullChannel |
16 | 设置后,如果在此时间(以毫秒为单位)内未收到发布者确认,网关将合成否定确认 (nack)。 待处理确认每检查一次此值的 50%,因此发送 nack 的实际时间将介于此值的 1 倍到 1.5 倍之间。 另请参阅发布者确认和返回的替代机制。 默认无(不会生成 nack)。 |
17 | 返回的消息发送到的通道。
如果提供,则底层 AMQP 模板配置为将无法送达的消息返回到网关。
该消息是根据从 AMQP 接收的数据构建的,并带有以下附加标头:、、 和 。
要求基础的 property 设置为 .
另请参阅发布者确认和返回的替代机制。
自选。amqp_returnReplyCode amqp_returnReplyText amqp_returnExchange amqp_returnRoutingKey AsyncRabbitTemplate mandatory true |
18 | 设置为 时,终端节点将在应用程序上下文初始化期间尝试连接到代理。
这样做可以在代理宕机时记录错误消息,从而“快速失败”检测错误配置。
当(默认)时,连接已建立(如果由于建立了其他组件而尚不存在
it) 的 intent 调用。false true |
有关更多信息,另请参阅 Asynchronous Service Activator 。
RabbitTemplate (兔模板)
当您使用 Confirmation 和 Returns 时,我们建议将 wired into be dedicated.
否则,可能会遇到意想不到的副作用。 |
发布者确认和返回的替代机制
当连接工厂配置为发布者确认和返回时,上面的部分讨论了消息通道的配置,以异步接收确认和返回。 从版本 5.4 开始,有一个通常更易于使用的附加机制。
在这种情况下,请勿配置 a 或 confirm 和 return 通道。
相反,请在标头中添加一个实例;然后,您可以通过检查已发送消息的实例中的 State of the future 来等待结果。
在 future 完成之前,将始终填充该字段(如果返回消息)。confirm-correlation-expression
CorrelationData
AmqpHeaders.PUBLISH_CONFIRM_CORRELATION
CorrelationData
returnedMessage
CorrelationData corr = new CorrelationData("someId"); // <--- Unique "id" is required for returns
someFlow.getInputChannel().send(MessageBuilder.withPayload("test")
.setHeader("rk", "someKeyThatWontRoute")
.setHeader(AmqpHeaders.PUBLISH_CONFIRM_CORRELATION, corr)
.build());
...
try {
Confirm Confirm = corr.getFuture().get(10, TimeUnit.SECONDS);
Message returned = corr.getReturnedMessage();
if (returned !- null) {
// message could not be routed
}
}
catch { ... }
为了提高性能,您可能希望发送多条消息并稍后等待确认,而不是一次发送一条消息。
返回的消息是转换后的原始消息;您可以使用所需的任何其他数据对 A 进行子类化。CorrelationData
入站消息转换
到达通道适配器或网关的入站消息将使用消息转换器转换为有效负载。
默认情况下,使用 a 来处理 java 序列化和文本。
默认情况下,标头使用 进行映射。
如果发生转换错误,并且没有定义错误通道,则异常将抛出到容器中,并由侦听器容器的错误处理程序处理。
默认错误处理程序将 conversion 错误视为致命错误,并且消息将被拒绝(并路由到死信交换,如果队列是这样配置的)。
如果定义了错误通道,则有效负载是具有属性的 (无法转换的 Spring AMQP 消息) 和 .
如果容器是(默认)并且错误流使用错误而不引发异常,则将确认原始消息。
如果错误流引发异常,则异常类型将与容器的错误处理程序一起确定是否将消息重新排队。
如果容器配置了 ,则有效负载是具有附加属性 和 的 。
这使错误流能够为消息调用 OR(或 )以控制其处置。spring-messaging
Message<?>
SimpleMessageConverter
DefaultHeaderMapper.inboundMapper()
ErrorMessage
ListenerExecutionFailedException
failedMessage
cause
AcknowledgeMode
AUTO
AcknowledgeMode.MANUAL
ManualAckListenerExecutionFailedException
channel
deliveryTag
basicAck
basicNack
basicReject
出站消息转换
Spring AMQP 1.4 引入了 ,其中实际的转换器是根据
在 Incoming Content Type Message 属性上。
入站终端节点可以使用此 API。ContentTypeDelegatingMessageConverter
从 Spring 集成版本 4.3 开始,你也可以使用出站端点,标头指定使用哪个转换器。ContentTypeDelegatingMessageConverter
contentType
以下示例配置了一个 ,默认转换器是(处理 Java 序列化和纯文本)以及一个 JSON 转换器:ContentTypeDelegatingMessageConverter
SimpleMessageConverter
<amqp:outbound-channel-adapter id="withContentTypeConverter" channel="ctRequestChannel"
exchange-name="someExchange"
routing-key="someKey"
amqp-template="amqpTemplateContentTypeConverter" />
<int:channel id="ctRequestChannel"/>
<rabbit:template id="amqpTemplateContentTypeConverter"
connection-factory="connectionFactory" message-converter="ctConverter" />
<bean id="ctConverter"
class="o.s.amqp.support.converter.ContentTypeDelegatingMessageConverter">
<property name="delegates">
<map>
<entry key="application/json">
<bean class="o.s.amqp.support.converter.Jackson2JsonMessageConverter" />
</entry>
</map>
</property>
</bean>
将消息发送到 并将标头设置为 将导致选择 JSON 转换器。ctRequestChannel
contentType
application/json
这适用于出站通道适配器和网关。
从版本 5.0 开始,添加到出站消息的 Headers 永远不会被映射的 Headers 覆盖(默认情况下)。
以前,仅当消息转换器是 a 时,才会出现这种情况(在这种情况下,首先映射 Headers,以便可以选择正确的转换器)。
对于其他转换器(如 ),映射的标头会覆盖转换器添加的任何标头。
当出站邮件具有一些剩余报头(可能来自入站通道适配器)并且正确的出站被错误覆盖时,这会导致问题。
解决方法是在将消息发送到出站终端节点之前使用标头筛选条件删除标头。 但是,在某些情况下,需要上述行为,例如,当包含 JSON 的有效负载不知道内容并将 message 属性设置为,但您的应用程序希望通过设置发送到出站终端节点的消息的标头来覆盖该属性。
“ this 正是这样做的(默认情况下)。 现在在出站通道适配器和网关(以及 AMQP 支持的通道)上有一个调用的属性。
将此项设置为可恢复覆盖转换器添加的属性的行为。 从版本 5.1.9 开始,当我们生成回复并希望覆盖转换器填充的标头时,提供了类似的功能。
有关更多信息,请参阅其 JavaDocs。 |
出站用户 ID
Spring AMQP 版本 1.6 引入了一种机制,允许为出站消息指定默认用户 ID。
始终可以设置标头,现在标头优先于默认值。
这可能对消息收件人有用。
对于入站消息,如果消息发布者设置了该属性,则该属性将在标头中可用。
请注意,RabbitMQ 会验证用户 ID 是否是连接的实际用户 ID,或者连接是否允许模拟。AmqpHeaders.USER_ID
AmqpHeaders.RECEIVED_USER_ID
要为出站邮件配置默认用户 ID,请在 上配置该 ID,并将出站适配器或网关配置为使用该模板。
同样,要在回复上设置用户 ID 属性,请将适当配置的模板注入入站网关。
有关更多信息,请参见 Spring AMQP 文档。RabbitTemplate
延迟消息交换
Spring AMQP 支持 RabbitMQ 延迟消息交换插件。
对于入站消息,报头将映射到报头。
设置报头会导致在出站邮件中设置相应的报头。
您还可以在出站终端节点上指定 and 属性(使用 XML 配置时)。
这些属性优先于标头。x-delay
AmqpHeaders.RECEIVED_DELAY
AMQPHeaders.DELAY
x-delay
delay
delayExpression
delay-expression
AmqpHeaders.DELAY
AMQP 支持的消息通道
有两种消息通道实现可用。
一个是点对点的,另一个是发布-订阅。
这两个通道都为底层 和 (如本章前面所示的通道适配器和网关) 提供了广泛的配置属性。
但是,我们在此处显示的示例具有最少的配置。
浏览 XML 架构以查看可用属性。AmqpTemplate
SimpleMessageListenerContainer
点对点通道可能类似于以下示例:
<int-amqp:channel id="p2pChannel"/>
在幕后,前面的示例导致声明 named ,并且此通道发送到该 (从技术上讲,通过使用与 this 的名称匹配的路由密钥发送到 no-name 直接交换)。
此通道还会在该 上注册一个 consumer。
如果您希望通道是 “pollable” 而不是消息驱动的,请为标志提供值 ,如下例所示:Queue
si.p2pChannel
Queue
Queue
Queue
message-driven
false
<int-amqp:channel id="p2pPollableChannel" message-driven="false"/>
发布-订阅通道可能如下所示:
<int-amqp:publish-subscribe-channel id="pubSubChannel"/>
在幕后,前面的示例会导致声明一个名为 fanout exchange,并且此通道发送到该 fanout 交换。
此通道还声明一个名为 exclusive、auto-delete、non-durable 的服务器,并将其绑定到扇出交换,同时在该交换上注册使用者以接收消息。
publish-subscribe-channel 没有 “pollable” 选项。
它必须是消息驱动的。si.fanout.pubSubChannel
Queue
Queue
从版本 4.1 开始,AMQP 支持的消息通道(与 一起)支持将 和
对于 .
请注意,以前是默认的。
现在,默认情况下,它适用于 .channel-transacted
template-channel-transacted
transactional
AbstractMessageListenerContainer
RabbitTemplate
channel-transacted
true
false
AbstractMessageListenerContainer
在版本 4.3 之前,AMQP 支持的通道仅支持带有有效负载和 Headers 的消息。
整个消息被转换(序列化)并发送到 RabbitMQ。
现在,您可以将属性(或使用 Java 配置时)设置为 .
当此标志为 时,将转换消息有效负荷并映射报头,其方式类似于使用通道适配器时。
这种安排允许 AMQP 支持的通道与不可序列化的有效负载一起使用(可能与另一个消息转换器一起使用,例如 )。
有关默认映射 Headers 的更多信息,请参阅 AMQP Message Headers。
您可以通过提供使用 和 属性的自定义映射器来修改映射。
现在,您还可以指定 ,该 用于在没有标头时设置投放模式。
默认情况下, Spring AMQP 使用交付模式。Serializable
extract-payload
setExtractPayload()
true
true
Jackson2JsonMessageConverter
outbound-header-mapper
inbound-header-mapper
default-delivery-mode
amqp_deliveryMode
MessageProperties
PERSISTENT
与其他支持持久性的通道一样,AMQP 支持的通道旨在提供消息持久性以避免消息丢失。 它们不用于将工作分发给其他对等应用程序。 为此,请改用通道适配器。 |
从版本 5.0 开始,pollable 通道现在阻塞指定 (默认为 1 秒) 的 poller 线程。
以前,与其他实现不同,如果没有可用的消息,则线程会立即返回到调度程序,而不管接收超时如何。
阻塞比使用 a 检索消息(无超时)要贵一些,因为必须创建一个使用者来接收每条消息。
要恢复以前的行为,请将 poller's 设置为 0。receiveTimeout PollableChannel basicGet() receiveTimeout |
使用 Java 配置进行配置
以下示例显示如何使用 Java 配置配置通道:
@Bean
public AmqpChannelFactoryBean pollable(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean();
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("foo");
factoryBean.setPubSub(false);
return factoryBean;
}
@Bean
public AmqpChannelFactoryBean messageDriven(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("bar");
factoryBean.setPubSub(false);
return factoryBean;
}
@Bean
public AmqpChannelFactoryBean pubSub(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("baz");
factoryBean.setPubSub(false);
return factoryBean;
}
使用 Java DSL 进行配置
以下示例显示如何使用 Java DSL 配置通道:
@Bean
public IntegrationFlow pollableInFlow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(...)
...
.channel(Amqp.pollableChannel(connectionFactory)
.queueName("foo"))
...
.get();
}
@Bean
public IntegrationFlow messageDrivenInFow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(...)
...
.channel(Amqp.channel(connectionFactory)
.queueName("bar"))
...
.get();
}
@Bean
public IntegrationFlow pubSubInFlow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(...)
...
.channel(Amqp.publishSubscribeChannel(connectionFactory)
.queueName("baz"))
...
.get();
}
AMQP 消息标头
概述
Spring 集成 AMQP 适配器会自动映射所有 AMQP 属性和头文件。
(这是对 4.3 的更改 - 以前,仅映射标准标头)。
默认情况下,这些属性通过使用DefaultAmqpHeaderMapper
复制到 Spring 集成或从 Spring 集成复制。MessageHeaders
你可以传入你自己的特定于 AMQP 的 Headers 映射器的实现,因为适配器具有支持这样做的属性。
AMQP MessageProperties
中的任何用户定义的标头都将复制到 AMQP 消息或从 AMQP 消息中复制,除非 .
默认情况下,对于出站映射器,不会映射任何标头。
请参阅本节后面出现的注意事项,了解原因。requestHeaderNames
replyHeaderNames
DefaultAmqpHeaderMapper
x-*
要覆盖默认值并恢复到 4.3 之前的行为,请在属性中使用 and。STANDARD_REQUEST_HEADERS
STANDARD_REPLY_HEADERS
映射用户定义的标头时,值还可以包含要匹配的简单通配符模式(如 或 )。
匹配所有标头。thing* *thing * |
从版本 4.1 开始,(超类)允许为 和 属性(除了现有的 和 )配置令牌,以映射所有用户定义的 Headers。AbstractHeaderMapper
DefaultAmqpHeaderMapper
NON_STANDARD_HEADERS
requestHeaderNames
replyHeaderNames
STANDARD_REQUEST_HEADERS
STANDARD_REPLY_HEADERS
该类标识 :org.springframework.amqp.support.AmqpHeaders
DefaultAmqpHeaderMapper
-
amqp_appId
-
amqp_clusterId
-
amqp_contentEncoding
-
amqp_contentLength
-
content-type
(请参阅contentType
标头) -
amqp_correlationId
-
amqp_delay
-
amqp_deliveryMode
-
amqp_deliveryTag
-
amqp_expiration
-
amqp_messageCount
-
amqp_messageId
-
amqp_receivedDelay
-
amqp_receivedDeliveryMode
-
amqp_receivedExchange
-
amqp_receivedRoutingKey
-
amqp_redelivered
-
amqp_replyTo
-
amqp_timestamp
-
amqp_type
-
amqp_userId
-
amqp_publishConfirm
-
amqp_publishConfirmNackCause
-
amqp_returnReplyCode
-
amqp_returnReplyText
-
amqp_returnExchange
-
amqp_returnRoutingKey
-
amqp_channel
-
amqp_consumerTag
-
amqp_consumerQueue
如本节前面所述,使用 Headers 映射模式是复制所有 Headers 的常见方法。
但是,这可能会产生一些意想不到的副作用,因为某些 RabbitMQ 专有属性/标头也会被复制。
例如,当您使用联合身份验证时,收到的消息可能具有一个名为 的属性,其中包含发送消息的节点。
如果您在入站网关上对请求和回复标头映射使用通配符,则会复制此标头,这可能会导致联合身份验证出现一些问题。
此回复消息可能会联合回发送代理,发送代理可能会认为消息正在循环,因此会以静默方式删除该消息。
如果您希望使用通配符标头映射的便利性,则可能需要筛选掉下游流中的一些标头。
例如,为了避免将 Headers 复制回回复,您可以在将回复发送到 AMQP 入站网关之前使用。
或者,您可以显式列出实际要映射的那些属性,而不是使用通配符。
由于这些原因,对于入站邮件,映射器(默认情况下)不会映射任何报头。
它也不会将 映射到报头,以避免该报头从入站邮件传播到出站邮件。
相反,此标头映射到 ,而 则不会在输出上映射。* x-received-from * x-received-from <int:header-filter … header-names="x-received-from"> x-* deliveryMode amqp_deliveryMode amqp_receivedDeliveryMode |
从版本 4.3 开始,可以通过在模式前面加上 .
否定模式获得优先级,因此 does not map (nor nor ) 等列表 does not map (nor nor ) )。
标准标头 plus 和 被映射。
否定技术可能很有用,例如,当 JSON 反序列化逻辑在接收方下游以不同的方式完成时,不为传入消息映射 JSON 类型的标头。
为此,应该为入站通道适配器/网关的 Headers Mapper 配置一个模式。!
STANDARD_REQUEST_HEADERS,thing1,ba*,!thing2,!thing3,qux,!thing1
thing1
thing2
thing3
bad
qux
!json_*
如果您有一个用户定义的标头,该标头以您希望映射的开头,则需要使用 , 对其进行转义,如下所示:。
名为 的标头现已映射。! \ STANDARD_REQUEST_HEADERS,\!myBangHeader !myBangHeader |
从版本 5.1 开始,如果出站消息中不存在相应的 or 标头,则将分别回退到 mapping 和 to 和 。
入站属性将像以前一样映射到标头。
当消息使用者使用有状态重试时,填充该属性非常有用。DefaultAmqpHeaderMapper MessageHeaders.ID MessageHeaders.TIMESTAMP MessageProperties.messageId MessageProperties.timestamp amqp_messageId amqp_timestamp amqp_* messageId |
标头contentType
与其他标头不同,它不以 ;这允许跨不同技术透明地传递 contentType 标头。
例如,发送到 RabbitMQ 队列的入站 HTTP 消息。AmqpHeaders.CONTENT_TYPE
amqp_
标头映射到 Spring AMQP 的属性,随后映射到 RabbitMQ 的属性。contentType
MessageProperties.contentType
content_type
在版本 5.1 之前,此 Headers 也被映射为 Map 中的一个条目;这是不正确的,此外,该值可能是错误的,因为底层 Spring AMQP 消息转换器可能已经更改了内容类型。
此类更改将反映在 first-class 属性中,但不反映在 RabbitMQ 标头映射中。
入站映射忽略了 headers 映射值。 不再映射到 Headers 映射中的条目。MessageProperties.headers
content_type
contentType
严格的消息排序
本节介绍入站和出站消息的消息排序。
入境
如果需要对入站消息进行严格排序,则必须将入站侦听器容器的属性配置为 。
这是因为,如果消息失败并重新传递,它将在现有的预取消息之后到达。
从 Spring AMQP 版本 2.0 开始,默认为以提高性能。
严格的订购要求以性能下降为代价。prefetchCount
1
prefetchCount
250
出境
请考虑以下集成流程:
@Bean
public IntegrationFlow flow(RabbitTemplate template) {
return IntegrationFlow.from(Gateway.class)
.split(s -> s.delimiters(","))
.<String, String>transform(String::toUpperCase)
.handle(Amqp.outboundAdapter(template).routingKey("rk"))
.get();
}
假设我们发送消息 ,并且 到网关。
虽然消息 、 很可能是按顺序发送的,但不能保证。
这是因为模板为每个 send 操作从缓存中“借用”一个通道,并且不能保证每条消息都使用相同的通道。
一种解决方案是在拆分器之前启动事务,但事务在 RabbitMQ 中成本高昂,并且性能会降低数百倍。A
B
C
A
B
C
为了以更有效的方式解决这个问题,从版本 5.1 开始, Spring 集成提供了 which is a .
请参阅处理消息建议。
在拆分器之前应用时,它确保所有下游操作都在同一通道上执行,并且可以选择等待收到所有已发送消息的发布者确认(如果连接工厂配置为确认)。
以下示例演示如何使用:BoundRabbitChannelAdvice
HandleMessageAdvice
BoundRabbitChannelAdvice
@Bean
public IntegrationFlow flow(RabbitTemplate template) {
return IntegrationFlow.from(Gateway.class)
.split(s -> s.delimiters(",")
.advice(new BoundRabbitChannelAdvice(template, Duration.ofSeconds(10))))
.<String, String>transform(String::toUpperCase)
.handle(Amqp.outboundAdapter(template).routingKey("rk"))
.get();
}
请注意,在 advice 和 outbound adapter 中使用相同的(实现 )。
该建议在模板的方法中运行下游流,以便所有操作都在同一通道上运行。
如果提供了可选的超时,则当流完成时,通知将调用该方法,如果在指定时间内未收到确认,该方法将引发异常。RabbitTemplate
RabbitOperations
invoke
waitForConfirmsOrDie
下游流中不得有线程 hands-off(、 和其他流)。QueueChannel ExecutorChannel |
AMQP 示例
要试验 AMQP 适配器,请查看 Spring 集成示例 git 存储库中提供的示例,网址为 https://github.com/SpringSource/spring-integration-samples
目前,一个示例通过使用出站通道适配器和入站通道适配器演示了 Spring 集成 AMQP 适配器的基本功能。 由于示例中的 AMQP 代理实现使用 RabbitMQ。
为了运行该示例,您需要一个正在运行的 RabbitMQ 实例。 仅具有基本默认值的本地安装就足够了。 有关 RabbitMQ 的详细安装过程,请参阅 https://www.rabbitmq.com/install.html |
启动示例应用程序后,在命令提示符上输入一些文本,包含该输入文本的消息将分派到 AMQP 队列。 作为回报,该消息由 Spring Integration 检索并打印到控制台。
下图说明了此示例中使用的 Spring 集成组件的基本集:
RabbitMQ 流队列支持
版本 6.0 引入了对 RabbitMQ 流队列的支持。
这些端点的 DSL 工厂类是 .RabbitStream
RabbitMQ 流入站通道适配器
@Bean
IntegrationFlow simpleStream(Environment env) {
return IntegrationFlow.from(RabbitStream.inboundAdapter(env).streamName("my.stream"))
// ...
.get();
}
@Bean
IntegrationFlow superStream(Environment env) {
return IntegrationFlow.from(RabbitStream.inboundAdapter(env).superStream("my.super.stream", "my.consumer"))
// ...
.get();
}