参考指南
本指南介绍了 Spring Cloud Stream Binder 的 RabbitMQ 实现。 它包含有关其设计、使用和配置选项的信息,以及有关 Stream Cloud Stream 概念如何映射到 RabbitMQ 特定构造的信息。
1. 用途
要使用 RabbitMQ Binders,可以使用以下 Maven 坐标将其添加到 Spring Cloud Stream 应用程序中:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
或者,您可以使用 Spring Cloud Stream RabbitMQ Starter,如下所示:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
2. RabbitMQ Binder 概述
以下简化图显示了 RabbitMQ Binder 的运行方式:
默认情况下,RabbitMQ Binder 实现将每个目标映射到一个 .
对于每个使用者组, a 绑定到该 .
每个使用者实例都有其组的相应 RabbitMQ 实例。
对于分区的创建者和使用者,队列以分区索引为后缀,并使用分区索引作为路由键。
对于匿名使用者(没有属性的使用者),将使用自动删除队列(具有随机的唯一名称)。TopicExchange
Queue
TopicExchange
Consumer
Queue
group
通过使用可选选项,您可以配置 Binder 以创建和配置死信队列 (DLQ)(以及 dead-letter exchange 以及路由基础设施)。
默认情况下,死信队列具有目标的名称,并附加 .
如果启用了重试 (),则在重试用尽后,失败的消息将传送到 DLQ。
如果禁用重试 (),则应设置为(默认值),以便将失败的消息路由到 DLQ,而不是重新排队。
此外,还会导致 Binder 将失败的消息发布到 DLQ(而不是拒绝它)。
此功能允许将其他信息(如 Headers 中的堆栈跟踪)添加到 Headers 中的消息中。
有关截断堆栈跟踪的信息,请参阅 frameMaxHeadroom
属性。
此选项不需要启用 retry。
您只需尝试一次即可重新发布失败的消息。
从版本 1.2 开始,您可以配置重新发布的消息的传递模式。
请参阅 republishDeliveryMode
属性。autoBindDlq
DLX
.dlq
maxAttempts > 1
maxAttempts = 1
requeueRejected
false
republishToDlq
x-exception-stacktrace
如果流侦听器抛出 ,则会绕过 DLQ,并简单地丢弃该消息。
从版本 2.1 开始,无论 的设置如何;以前,只有当 是 时,才会出现这种情况。ImmediateAcknowledgeAmqpException
republishToDlq
republishToDlq
false
设置为 (with ) 会导致消息重新排队并持续重新传送,除非失败的原因是暂时的,否则这可能不是您想要的。
通常,您应该通过设置为 greater than 1 或设置为 在 Binder 中启用重试。requeueRejected true republishToDlq=false maxAttempts republishToDlq true |
有关这些属性的更多信息,请参阅 RabbitMQ Binder 属性。
该框架不提供任何标准机制来使用死信消息(或将它们重新路由回主队列)。 死信队列处理中介绍了一些选项。
当在 Spring Cloud Stream 应用程序中使用多个 RabbitMQ Binders 时,禁用“RabbitAutoConfiguration”以避免将相同的配置应用于两个 Binders 非常重要。
您可以使用注释排除该类。RabbitAutoConfiguration @SpringBootApplication |
从版本 2.0 开始,将属性设置为 ,以便非事务性创建者避免使用者死锁,如果缓存的连接由于代理上的内存警报而被阻止,则可能会发生这种情况。RabbitMessageChannelBinder
RabbitTemplate.userPublisherConnection
true
目前,只有消息驱动的使用者支持使用者(侦听多个队列的单个使用者);轮询的使用者只能从单个队列中检索消息。multiplex |
3. 配置选项
本节包含特定于 RabbitMQ Binder 和绑定通道的设置。
有关常规绑定配置选项和属性,请参阅 Spring Cloud Stream 核心文档。
3.1. RabbitMQ Binder 属性
默认情况下,RabbitMQ Binder 使用 Spring Boot 的 .
同时,它支持 RabbitMQ 的所有 Spring Boot 配置选项。
(有关参考,请参阅 Spring Boot 文档)。
RabbitMQ 配置选项使用前缀。ConnectionFactory
spring.rabbitmq
除了 Spring Boot 选项之外,RabbitMQ Binder 还支持以下属性:
- spring.cloud.stream.rabbit.binder.admin地址
-
以逗号分隔的 RabbitMQ 管理插件 URL 列表。 仅在包含多个条目时使用。 此列表中的每个条目都必须在 中具有相应的条目。 仅当使用 RabbitMQ 集群并希望从托管队列的节点使用时才需要。 有关更多信息,请参见队列关联和 LocalizedQueueConnectionFactory。
nodes
spring.rabbitmq.addresses
默认值:空。
- spring.cloud.stream.rabbit.binder.nodes
-
以逗号分隔的 RabbitMQ 节点名称列表。 当有多个条目时,用于查找队列所在的服务器地址。 此列表中的每个条目都必须在 中具有相应的条目。 仅当使用 RabbitMQ 集群并希望从托管队列的节点使用时才需要。 有关更多信息,请参见队列关联和 LocalizedQueueConnectionFactory。
spring.rabbitmq.addresses
默认值:空。
- spring.cloud.stream.rabbit.binder.compressionLevel
-
压缩绑定的压缩级别。 看。
java.util.zip.Deflater
默认值:(BEST_LEVEL)。
1
- spring.cloud.stream.binder.connection-name-prefix
-
用于命名此 Binder 创建的连接的连接名称前缀。 名称是此前缀,后跟 ,其中每次打开新连接时都会递增。
#n
n
默认值:none (Spring AMQP 默认值)。
3.2. RabbitMQ Consumer 属性
以下属性仅适用于 Rabbit 使用者,并且必须以 .spring.cloud.stream.rabbit.bindings.<channelName>.consumer.
但是,如果需要将同一组属性应用于大多数绑定,则
避免重复,Spring Cloud Stream 支持为所有通道设置值,
格式为 .spring.cloud.stream.rabbit.default.<property>=<value>
此外,请记住,绑定特定属性将覆盖其在默认值中的等效属性。
- acknowledgeMode
-
确认模式。
违约:。
AUTO
- anonymousGroupPrefix 的
-
当绑定没有属性时,匿名的自动删除队列将绑定到目标交换。 此类队列的默认命名策略将导致名为 . 设置此属性可将前缀更改为默认值以外的其他名称。
group
anonymous.<base64 representation of a UUID>
违约:。
anonymous.
- autoBindDlq
-
是否自动声明 DLQ 并将其绑定到 Binder DLX。
违约:。
false
- bindingRoutingKey
-
用于将队列绑定到 exchange 的路由密钥(如果为 is )。 可以是多个键 - 请参阅 。 对于分区目标, 附加到每个键。
bindQueue
true
bindingRoutingKeyDelimiter
-<instanceIndex>
违约:。
#
- bindingRoutingKeyDelimiter
-
当 this 不为 null 时,'bindingRoutingKey' 被视为由此值分隔的键列表;通常使用逗号。
违约:。
null
- bindQueue (绑定队列)
-
是否声明队列并将其绑定到目标 Exchange。 如果您已设置自己的基础设施,并且之前已创建并绑定队列,请将其设置为 。
false
违约:。
true
- consumerTag前缀
-
用于创建 Consumer 标签;将附加 where 创建的每个使用者的增量。 例:。
#n
n
${spring.application.name}-${spring.cloud.stream.bindings.input.group}-${spring.cloud.stream.instance-index}
默认值:none - 代理将生成随机的消费者标签。
- containerType 容器类型
-
选择要使用的侦听器容器类型。 有关更多信息,请参见 Spring AMQP 文档中的 选择容器 。
违约:
simple
- deadLetterQueueName (死信队列名称)
-
DLQ 的名称
违约:
prefix+destination.dlq
- 死信交换
-
要分配给队列的 DLX。 仅当 为 时才相关。
autoBindDlq
true
默认值: 'prefix+DLX'
- deadLetterExchange类型
-
要分配给队列的 DLX 的类型。 仅当 为 时才相关。
autoBindDlq
true
默认值: 'direct'
- deadLetterRoutingKey
-
要分配给队列的死信路由键。 仅当 为 时才相关。
autoBindDlq
true
违约:
destination
- 声明 Dlx
-
是否声明目标的死信交换。 仅当 为 时才相关。 如果您有预配置的 DLX,则设置为 。
autoBindDlq
true
false
违约:。
true
- declareExchange
-
是否声明目标的交换。
违约:。
true
- delayedExchange (延迟交换)
-
是否将交换声明为 . 需要 broker 上的 delayed message exchange 插件。 该参数设置为 .
Delayed Message Exchange
x-delayed-type
exchangeType
违约:。
false
- dlqBindingArguments
-
将 dlq 绑定到死信交换时应用的参数;用于指定要匹配的标头。 例如。
headers
deadLetterExchangeType
…dlqBindingArguments.x-match=any
…dlqBindingArguments.someHeader=someValue
默认值:空
- dlqDeadLetterExchange
-
如果声明了 DLQ,则为要分配给该队列的 DLX。
违约:
none
- dlqDeadLetterRoutingKey
-
如果声明了 DLQ,则为该队列分配一个死信路由键。
违约:
none
- dlq过期
-
删除未使用的死信队列之前的时间(以毫秒为单位)。
违约:
no expiration
- dlq懒惰
-
使用参数声明死信队列。 请参见“延迟队列”。 请考虑使用策略而不是此设置,因为使用策略允许在不删除队列的情况下更改设置。
x-queue-mode=lazy
违约:。
false
- dlqMaxLength
-
死信队列中的最大消息数。
违约:
no limit
- dlqMaxLengthBytes
-
来自所有消息的死信队列中的最大总字节数。
违约:
no limit
- dlqMaxPriority
-
死信队列中消息的最大优先级 (0-255)。
违约:
none
- dlqOverflowBehavior 函数
-
当或超出时要采取的操作;currently or 但请参阅 RabbitMQ 文档。
dlqMaxLength
dlqMaxLengthBytes
drop-head
reject-publish
违约:
none
- dlqQuorum.deliveryLimit
-
When ,设置一个传递限制,超过该限制后,消息将被丢弃或死信。
quorum.enabled=true
Default: none - 将应用代理默认值。
- dlqQuorum.enabled 已启用
-
如果为 true,则创建 quorum 死信队列,而不是 Classic 队列。
默认值:false
- dlqQuorum.initialQuorumSize
-
当 时,设置初始仲裁大小。
quorum.enabled=true
Default: none - 将应用代理默认值。
- dlqSingleActiveConsumer
-
设置为 true 可将 queue 属性设置为 true。
x-single-active-consumer
违约:
false
- dlqTtl
-
声明时应用于死信队列的默认生存时间(以毫秒为单位)。
违约:
no limit
- durableSubscription (耐用订阅)
-
订阅是否应为持久订阅。 仅在 设置时有效。
group
违约:。
true
- exchange自动删除
-
如果为 true,则是否应自动删除交换(即,在删除最后一个队列后删除)。
declareExchange
违约:。
true
- 交换耐用
-
如果为 true,则交换是否应该是持久的(即,它在代理重新启动后仍然存在)。
declareExchange
违约:。
true
- exchangeType
-
交换类型: , , 或用于非分区目标和 , headers 或用于分区目标。
direct
fanout
headers
topic
direct
topic
违约:。
topic
- 独家
-
是否创建独占消费者。 当 为 时,并发性应为 1。 通常在需要严格排序时使用,但允许热备用实例在发生故障后接管。 请参阅 ,它控制备用实例尝试使用的频率。 使用 RabbitMQ 3.8 或更高版本时,请考虑改用。
true
recoveryInterval
singleActiveConsumer
违约:。
false
- 到期
-
删除未使用的队列之前的时间(以毫秒为单位)。
违约:
no expiration
- failedDeclarationRetryInterval
-
如果队列缺失,则尝试从队列中使用之间的间隔(以毫秒为单位)。
默认值:5000
- frameMaxHeadroom
-
将堆栈跟踪添加到 DLQ 消息标头时要为其他标头保留的字节数。 所有标头都必须适合 broker 上配置的大小。 堆栈跟踪可能很大;如果 size 加上 this 属性超过,则堆栈跟踪将被截断。 将写入 WARN 日志;考虑通过捕获异常并引发具有较小堆栈跟踪的异常来增加或减少堆栈跟踪。
frame_max
frame_max
frame_max
默认值:20000
- headerPatterns 的
-
要从入站消息映射的标头的模式。
默认值:(所有标头)。
['*']
- 懒惰
-
使用参数声明队列。 请参见“延迟队列”。 请考虑使用策略而不是此设置,因为使用策略允许在不删除队列的情况下更改设置。
x-queue-mode=lazy
违约:。
false
- max并发
-
消费者的最大数量。 当 为 时不支持。
containerType
direct
违约:。
1
- maxLength
-
队列中的最大消息数。
违约:
no limit
- maxLengthBytes
-
队列中所有消息的最大总字节数。
违约:
no limit
- 最大优先级
-
队列中消息的最大优先级 (0-255)。
违约:
none
- missingQueues致命
-
当找不到队列时,是否将该条件视为 fatal 并停止监听器容器。 默认为,以便容器不断尝试从队列中使用 — 例如,当使用集群并且托管非 HA 队列的节点关闭时。
false
违约:
false
- overflowBehavior
-
当或超出时要采取的操作;currently or 但请参阅 RabbitMQ 文档。
maxLength
maxLengthBytes
drop-head
reject-publish
违约:
none
- 预取
-
预取计数。
违约:。
1
- 前缀
-
要添加到 和 队列名称的前缀。
destination
默认值: “”。
- queueBindingArguments
-
将队列绑定到 exchange 时应用的参数;用于指定要匹配的标头。 例如。
headers
exchangeType
…queueBindingArguments.x-match=any
…queueBindingArguments.someHeader=someValue
默认值:空
- queueDeclarationRetries
-
如果队列缺失,则重试从队列中消费的次数。 仅当 为 时才相关。 否则,容器将无限期地重试。 当 为 时不支持。
missingQueuesFatal
true
containerType
direct
违约:
3
- queueNameGroupOnly
-
如果为 true,则从名称等于 . 否则,队列名称为 。 例如,当使用 Spring Cloud Stream 从现有 RabbitMQ 队列中使用时,这非常有用。
group
destination.group
默认值:false。
- quorum.deliveryLimit
-
When ,设置一个传递限制,超过该限制后,消息将被丢弃或死信。
quorum.enabled=true
Default: none - 将应用代理默认值。
- 法定人数已启用
-
如果为 true,则创建 quorum 队列而不是 Classic 队列。
默认值:false
- quorum.initialQuorumSize
-
当 时,设置初始仲裁大小。
quorum.enabled=true
Default: none - 将应用代理默认值。
- recoveryInterval 的
-
连接恢复尝试之间的间隔(以毫秒为单位)。
违约:。
5000
- requeueRejected 的
-
当 retry 被禁用时,是否应将投放失败重新排队,还是 。
republishToDlq
false
违约:。
false
- republishDeliveryMode
-
When is 指定重新发布的消息的传送模式。
republishToDlq
true
违约:
DeliveryMode.PERSISTENT
- republishToDlq
-
默认情况下,在重试用尽后失败的消息将被拒绝。 如果配置了死信队列 (DLQ),则 RabbitMQ 会将失败的消息(未更改)路由到 DLQ。 如果设置为 ,则 Binder 会将失败的消息重新发布到 DLQ,其中包含其他标头,包括异常消息和来自最终失败原因的堆栈跟踪。 另请参阅 frameMaxHeadroom 属性。
true
默认值:false
- singleActiveConsumer
-
设置为 true 可将 queue 属性设置为 true。
x-single-active-consumer
违约:
false
- 交易
-
是否使用事务处理通道。
违约:。
false
- ttl
-
声明时应用于队列的默认生存时间(以毫秒为单位)。
违约:
no limit
- txSize (tx大小)
-
ack 之间的投放数。 当 为 时不支持。
containerType
direct
违约:。
1
3.3. 高级侦听器容器配置
要设置未作为 Binder 或 Binding 属性公开的侦听器容器属性,请将 type 的单个 bean 添加到应用程序上下文中。
将设置 Binder 和 binding 属性,然后调用定制器。
定制器 ( method) 提供队列名称以及使用者组作为参数。ListenerContainerCustomizer
configure()
3.4. 高级队列/交换/绑定配置
RabbitMQ 团队会不时添加新功能,这些功能通过在声明队列等时设置一些参数来启用。
通常,此类功能是通过添加适当的属性在 Binder 中启用的,但这在当前版本中可能无法立即使用。
从版本 3.0.1 开始,您现在可以将 bean 添加到应用程序上下文中,以便在执行声明之前修改 () 或 )。
这允许您添加 Binder 当前不直接支持的参数。DeclarableCustomizer
Declarable
Queue
Exchange
Binding
3.5. 接收批量消息
通常,如果 producer 绑定具有(请参阅 Rabbit Producer Properties),或者消息是由 ,则批处理的元素将作为对侦听器方法的单独调用返回。
从版本 3.0 开始,如果设置为 ,则任何此类批处理都可以作为 to listener 方法表示。batch-enabled=true
BatchingRabbitTemplate
List<?>
spring.cloud.stream.bindings.<name>.consumer.batch-mode
true
3.6. Rabbit Producer 属性
以下属性仅适用于 Rabbit 生产者,并且必须以 .spring.cloud.stream.rabbit.bindings.<channelName>.producer.
但是,如果需要将同一组属性应用于大多数绑定,则
避免重复,Spring Cloud Stream 支持为所有通道设置值,
格式为 .spring.cloud.stream.rabbit.default.<property>=<value>
此外,请记住,绑定特定属性将覆盖其在默认值中的等效属性。
- autoBindDlq
-
是否自动声明 DLQ 并将其绑定到 Binder DLX。
违约:。
false
- batching已启用
-
是否启用生产者消息批处理。 消息根据以下属性(在此列表中的以下三个条目中描述)分批到一条消息中:'batchSize'、 和 . 有关更多信息,请参阅 批处理 。 另请参阅接收 Batched Messages。
batchBufferLimit
batchTimeout
违约:。
false
- 批量大小
-
启用批处理时要缓冲的消息数。
违约:。
100
- batchBufferLimit
-
启用批处理时的最大缓冲区大小。
违约:。
10000
- batch超时
-
启用批处理时的批处理超时。
违约:。
5000
- bindingRoutingKey
-
用于将队列绑定到 exchange 的路由密钥(如果为 is )。 可以是多个键 - 请参阅 。 对于分区目标, 附加到每个键。 仅在提供时适用,并且仅适用于这些组。
bindQueue
true
bindingRoutingKeyDelimiter
-n
requiredGroups
违约:。
#
- bindingRoutingKeyDelimiter
-
当 this 不为 null 时,'bindingRoutingKey' 被视为由此值分隔的键列表;通常使用逗号。 仅在提供时适用,并且仅适用于这些组。
requiredGroups
违约:。
null
- bindQueue (绑定队列)
-
是否声明队列并将其绑定到目标 Exchange。 如果您已设置自己的基础设施,并且之前已创建并绑定队列,请将其设置为 。 仅在提供时适用,并且仅适用于这些组。
false
requiredGroups
违约:。
true
- 压缩
-
发送时是否应压缩数据。
违约:。
false
- confirmAckChannel 确认
-
when is true,向其发送肯定投放确认的渠道(又名 publisher confirms)。 如果通道不存在,则使用此名称注册 a。 必须将连接工厂配置为启用发布者确认。
errorChannelEnabled
DirectChannel
默认值:(丢弃 ack)。
nullChannel
- deadLetterQueueName (死信队列名称)
-
DLQ 的名称 仅在提供时适用,并且仅适用于这些组。
requiredGroups
违约:
prefix+destination.dlq
- 死信交换
-
要分配给队列的 DLX。 仅当 为 时才相关。 仅在提供时适用,然后仅应用于这些组。
autoBindDlq
true
requiredGroups
默认值: 'prefix+DLX'
- deadLetterExchange类型
-
要分配给队列的 DLX 的类型。 仅当 为 时才相关。 仅在提供时适用,然后仅应用于这些组。
autoBindDlq
true
requiredGroups
默认值: 'direct'
- deadLetterRoutingKey
-
要分配给队列的死信路由键。 仅当 为 时才相关。 仅在提供时适用,然后仅应用于这些组。
autoBindDlq
true
requiredGroups
违约:
destination
- 声明 Dlx
-
是否声明目标的死信交换。 仅当 为 时才相关。 如果您有预配置的 DLX,则设置为 。 仅在提供时适用,然后仅应用于这些组。
autoBindDlq
true
false
requiredGroups
违约:。
true
- declareExchange
-
是否声明目标的交换。
违约:。
true
- delay表达式
-
一个 SPEL 表达式,用于评估要应用于消息的延迟( header)。 如果交换不是延迟消息交换,则它不起作用。
x-delay
默认值:未设置标头。
x-delay
- delayedExchange (延迟交换)
-
是否将交换声明为 . 需要 broker 上的 delayed message exchange 插件。 该参数设置为 .
Delayed Message Exchange
x-delayed-type
exchangeType
违约:。
false
- deliveryMode 交付模式
-
交货模式。
违约:。
PERSISTENT
- dlqBindingArguments
-
将 dlq 绑定到死信交换时应用的参数;用于指定要匹配的标头。 例如。 仅在提供时适用,然后仅应用于这些组。
headers
deadLetterExchangeType
…dlqBindingArguments.x-match=any
…dlqBindingArguments.someHeader=someValue
requiredGroups
默认值:空
- dlqDeadLetterExchange
-
声明 DLQ 时,要分配给该队列的 DLX。 仅在提供时适用,然后仅应用于这些组。
requiredGroups
违约:
none
- dlqDeadLetterRoutingKey
-
声明 DLQ 时,要分配给该队列的死信路由键。 仅在提供时适用,然后仅应用于这些组。
requiredGroups
违约:
none
- dlq过期
-
删除未使用的死信队列之前的时间(以毫秒为单位)。 仅在提供时适用,然后仅应用于这些组。
requiredGroups
违约:
no expiration
- dlq懒惰
-
使用参数声明死信队列。 请参见“延迟队列”。 请考虑使用策略而不是此设置,因为使用策略允许在不删除队列的情况下更改设置。 仅在提供时适用,然后仅应用于这些组。
x-queue-mode=lazy
requiredGroups
- dlqMaxLength
-
死信队列中的最大消息数。 仅在提供时适用,然后仅应用于这些组。
requiredGroups
违约:
no limit
- dlqMaxLengthBytes
-
来自所有消息的死信队列中的最大总字节数。 仅在提供时适用,然后仅应用于这些组。
requiredGroups
违约:
no limit
- dlqMaxPriority
-
死信队列中消息的最大优先级 (0-255) 仅在提供时适用,然后仅应用于这些组。
requiredGroups
违约:
none
- dlqQuorum.deliveryLimit
-
When ,设置一个传递限制,超过该限制后,消息将被丢弃或死信。 仅在提供时适用,然后仅应用于这些组。
quorum.enabled=true
requiredGroups
Default: none - 将应用代理默认值。
- dlqQuorum.enabled 已启用
-
如果为 true,则创建 quorum 死信队列,而不是 Classic 队列。 仅在提供时适用,然后仅应用于这些组。
requiredGroups
默认值:false
- dlqQuorum.initialQuorumSize
-
当 时,设置初始仲裁大小。 仅在提供时适用,然后仅应用于这些组。
quorum.enabled=true
requiredGroups
Default: none - 将应用代理默认值。
- dlqSingleActiveConsumer
-
设置为 true 可将 queue 属性设置为 true。 仅在提供时适用,然后仅应用于这些组。
x-single-active-consumer
requiredGroups
违约:
false
- dlqTtl
-
声明时应用于死信队列的默认生存时间(以毫秒为单位)。 仅在提供时适用,然后仅应用于这些组。
requiredGroups
违约:
no limit
- exchange自动删除
-
如果是 ,是否应自动删除交换(在删除最后一个队列后将其删除)。
declareExchange
true
违约:。
true
- 交换耐用
-
如果是,则交换是否应该是持久的(在代理重新启动后继续存在)。
declareExchange
true
违约:。
true
- exchangeType
-
交换类型: , , or 用于非分区目标 和 , or 用于分区目标。
direct
fanout
headers
topic
direct
headers
topic
违约:。
topic
- 到期
-
删除未使用的队列之前的时间 (以毫秒为单位)。 仅在提供时适用,然后仅应用于这些组。
requiredGroups
违约:
no expiration
- headerPatterns 的
-
要映射到出站消息的标头的模式。
默认值:(所有标头)。
['*']
- 懒惰
-
使用参数声明队列。 请参见“延迟队列”。 请考虑使用策略而不是此设置,因为使用策略允许在不删除队列的情况下更改设置。 仅在提供时适用,然后仅应用于这些组。
x-queue-mode=lazy
requiredGroups
违约:。
false
- maxLength
-
队列中的最大消息数。 仅在提供时适用,然后仅应用于这些组。
requiredGroups
违约:
no limit
- maxLengthBytes
-
队列中所有消息的最大总字节数。 仅在提供时适用,并且仅适用于这些组。
requiredGroups
违约:
no limit
- 最大优先级
-
队列中消息的最大优先级 (0-255)。 仅在提供时适用,并且仅适用于这些组。
requiredGroups
违约:
none
- 前缀
-
要添加到交易所名称的前缀。
destination
默认值: “”。
- queueBindingArguments
-
将队列绑定到 exchange 时应用的参数;用于指定要匹配的标头。 例如。 仅在提供时适用,然后仅应用于这些组。
headers
exchangeType
…queueBindingArguments.x-match=any
…queueBindingArguments.someHeader=someValue
requiredGroups
默认值:空
- queueNameGroupOnly
-
当 时,从名称等于 的队列中使用。 否则,队列名称为 。 例如,当使用 Spring Cloud Stream 从现有 RabbitMQ 队列中使用时,这非常有用。 仅在提供时适用,然后仅应用于这些组。
true
group
destination.group
requiredGroups
默认值:false。
- quorum.deliveryLimit
-
When ,设置一个传递限制,超过该限制后,消息将被丢弃或死信。 仅在提供时适用,然后仅应用于这些组。
quorum.enabled=true
requiredGroups
Default: none - 将应用代理默认值。
- 法定人数已启用
-
如果为 true,则创建 quorum 队列而不是 Classic 队列。 仅在提供时适用,然后仅应用于这些组。
requiredGroups
默认值:false
- quorum.initialQuorumSize
-
当 时,设置初始仲裁大小。 仅在提供时适用,然后仅应用于这些组。
quorum.enabled=true
requiredGroups
Default: none - 将应用代理默认值。
- routingKeyExpression 的
-
一个 SPEL 表达式,用于确定发布消息时要使用的路由键。 对于固定路由键,请使用文本表达式,例如在属性文件或 YAML 文件中。
routingKeyExpression='my.routingKey'
routingKeyExpression: '''my.routingKey'''
Default: or 用于分区目标。
destination
destination-<partition>
- singleActiveConsumer
-
设置为 true 可将 queue 属性设置为 true。 仅在提供时适用,然后仅应用于这些组。
x-single-active-consumer
requiredGroups
违约:
false
- 交易
-
是否使用事务处理通道。
违约:。
false
- ttl
-
声明时应用于队列的默认生存时间(以毫秒为单位)。 仅在提供时适用,然后仅应用于这些组。
requiredGroups
违约:
no limit
对于 RabbitMQ,内容类型标头可以由外部应用程序设置。 Spring Cloud Stream 支持将它们作为扩展内部协议的一部分,用于任何类型的传输,包括传输,例如 Kafka(0.11 之前),它们本身不支持 headers。 |
4. 使用现有队列/交易所
默认情况下,Binder 将自动配置主题交换,其名称派生自 destination binding property 的值。
如果未提供,则 destination 默认为绑定名称。
绑定使用者时,将自动为队列配置名称(如果指定了 binding 属性),或者在没有 .
队列将使用 “match-all” 通配符路由键 () 绑定到非分区绑定或分区绑定的交换。
默认情况下,前缀为空。
如果使用 指定了输出绑定,则将为每个组配置一个队列/绑定。<prefix><destination>
<prefix><destination>.<group>
group
group
#
<destination>-<instanceIndex>
String
requiredGroups
有许多特定于 rabbit 的绑定属性允许您修改此默认行为。
如果您有要使用的现有 exchange/queue,则可以按如下方式完全禁用自动配置,假设 exchange 已命名且队列已命名:myExchange
myQueue
-
spring.cloud.stream.bindings.<binding name>.destination=myExhange
-
spring.cloud.stream.bindings.<binding name>.group=myQueue
-
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.bindQueue=false
-
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.declareExchange=false
-
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.queueNameGroupOnly=true
如果您希望 Binders 预置 queue/exchange,但您希望使用此处讨论的默认值以外的其他功能来执行此操作,请使用以下属性。 有关更多信息,请参阅上面的属性文档。
-
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.bindingRoutingKey=myRoutingKey
-
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.exchangeType=<type>
-
spring.cloud.stream.rabbit.bindings.<binding name>.producer.routingKeyExpression='myRoutingKey'
在声明死信交换/队列时,会使用类似的属性,当 is 时。autoBindDlq
true
5. 使用 RabbitMQ Binder 重试
在 Binder 中启用重试后,侦听器容器线程将在配置的任何回退期间暂停。 当单个使用者需要严格排序时,这可能很重要。但是,对于其他使用案例,它会阻止在该线程上处理其他消息。 使用 Binder 重试的另一种方法是在死信队列 (DLQ) 上设置具有生存时间的死信,并在 DLQ 本身上设置死信配置。 有关此处讨论的属性的更多信息,请参阅“RabbitMQ Binder 属性”。 您可以使用以下示例配置来启用此功能:
-
设置为 . Binder 将创建一个 DLQ。 (可选)您可以在 中指定名称。
autoBindDlq
true
deadLetterQueueName
-
设置为您希望在两次重新投递之间等待的退后时间。
dlqTtl
-
将 设置为 default exchange。 来自 DLQ 的过期邮件将路由到原始队列,因为默认队列名称为 ()。 设置为默认 exchange 是通过设置无值的属性来实现的,如下一个示例所示。
dlqDeadLetterExchange
deadLetterRoutingKey
destination.group
要强制消息为死信,请抛出 an 或 set to (默认值) 并抛出 any 异常。AmqpRejectAndDontRequeueException
requeueRejected
true
循环无休止地继续,这对于暂时性问题来说很好,但你可能想在尝试几次后放弃。
幸运的是,RabbitMQ 提供了标头,它允许您确定已经发生了多少个周期。x-death
要在放弃后确认消息,请抛出 .ImmediateAcknowledgeAmqpException
5.1. 把它们放在一起
以下配置创建一个 exchange,其中 queue 绑定到具有通配符 routing key 的主题 exchange:myDestination
myDestination.consumerGroup
#
---
spring.cloud.stream.bindings.input.destination=myDestination
spring.cloud.stream.bindings.input.group=consumerGroup
#disable binder retries
spring.cloud.stream.bindings.input.consumer.max-attempts=1
#dlx/dlq setup
spring.cloud.stream.rabbit.bindings.input.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-ttl=5000
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-dead-letter-exchange=
---
此配置将创建一个绑定到路由键为 .
当邮件被拒绝时,它们将被路由到 DLQ。
5 秒后,消息过期,并使用队列名称作为路由键路由到原始队列,如以下示例所示:DLX
myDestination.consumerGroup
@SpringBootApplication
@EnableBinding(Sink.class)
public class XDeathApplication {
public static void main(String[] args) {
SpringApplication.run(XDeathApplication.class, args);
}
@StreamListener(Sink.INPUT)
public void listen(String in, @Header(name = "x-death", required = false) Map<?,?> death) {
if (death != null && death.get("count").equals(3L)) {
// giving up - don't send to DLX
throw new ImmediateAcknowledgeAmqpException("Failed after 4 attempts");
}
throw new AmqpRejectAndDontRequeueException("failed");
}
}
请注意,标头中的 count 属性是 .x-death
Long
6. 错误通道
从版本 1.3 开始,Binder 无条件地将异常发送到每个 Consumer 目标的错误通道,并且还可以配置为将异步 producer send failures发送到 error 通道。 有关更多信息,请参见“[spring-cloud-stream-overview-error-handling]”。
RabbitMQ 有两种类型的发送失败:
-
返回的消息、
-
否定确认 发行商确认。
后者很少见。 根据 RabbitMQ 文档,“[A nack] 只有在负责队列的 Erlang 进程中发生内部错误时才会交付。
除了启用生产者错误通道(如“[spring-cloud-stream-overview-error-handling]”中所述)之外,RabbitMQ Binder 仅在连接工厂配置适当的情况下才会向通道发送消息,如下所示:
-
ccf.setPublisherConfirms(true);
-
ccf.setPublisherReturns(true);
将 Spring Boot 配置用于连接工厂时,请设置以下属性:
-
spring.rabbitmq.publisher-confirms
-
spring.rabbitmq.publisher-returns
的有效负载 for a 返回的消息是 a 具有以下属性:ErrorMessage
ReturnedAmqpMessageException
-
failedMessage
:发送失败的 spring-messaging。Message<?>
-
amqpMessage
:原始 spring-amqp 。Message
-
replyCode
:一个整数值,指示失败的原因(例如,312 - 无路由)。 -
replyText
:指示失败原因的文本值(例如,)。NO_ROUTE
-
exchange
:消息发布到的 Exchange。 -
routingKey
:发布消息时使用的路由密钥。
对于否定确认的确认,有效负载是具有以下属性的 a:NackedAmqpMessageException
-
failedMessage
:发送失败的 spring-messaging。Message<?>
-
nackReason
:原因(如果可用 — 您可能需要检查代理日志以了解更多信息)。
不会自动处理这些异常(例如发送到死信队列)。 您可以在自己的 Spring 集成流中使用这些异常。
7. 死信队列处理
由于您无法预测用户希望如何处理死信消息,因此框架不提供任何标准机制来处理它们。
如果死信的原因是暂时的,您可能希望将消息路由回原始队列。
但是,如果问题是永久性问题,则可能会导致无限循环。
Spring 下面的 Boot 应用程序展示了如何将这些消息路由回原始队列,但在三次尝试后将它们移动到第三个“停车场”队列的示例。
第二个示例使用 RabbitMQ Delayed Message Exchange 为重新排队的消息引入延迟。
在此示例中,每次尝试的延迟都会增加。
这些示例使用 a 从 DLQ 接收消息。
您也可以在批处理中使用。@RabbitListener
RabbitTemplate.receive()
这些示例假定原始目标是 ,使用者组是 。so8400in
so8400
7.1. 未分区的目标
前两个示例适用于目标未分区的情况:
@SpringBootApplication
public class ReRouteDlqApplication {
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private static final String X_RETRIES_HEADER = "x-retries";
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
System.out.println("Hit enter to terminate");
System.in.read();
context.close();
}
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Integer retriesHeader = (Integer) failedMessage.getMessageProperties().getHeaders().get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
failedMessage.getMessageProperties().getHeaders().put(X_RETRIES_HEADER, retriesHeader + 1);
this.rabbitTemplate.send(ORIGINAL_QUEUE, failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
@Bean
public Queue parkingLot() {
return new Queue(PARKING_LOT);
}
}
@SpringBootApplication
public class ReRouteDlqApplication {
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private static final String X_RETRIES_HEADER = "x-retries";
private static final String DELAY_EXCHANGE = "dlqReRouter";
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
System.out.println("Hit enter to terminate");
System.in.read();
context.close();
}
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
headers.put("x-delay", 5000 * retriesHeader);
this.rabbitTemplate.send(DELAY_EXCHANGE, ORIGINAL_QUEUE, failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
@Bean
public DirectExchange delayExchange() {
DirectExchange exchange = new DirectExchange(DELAY_EXCHANGE);
exchange.setDelayed(true);
return exchange;
}
@Bean
public Binding bindOriginalToDelay() {
return BindingBuilder.bind(new Queue(ORIGINAL_QUEUE)).to(delayExchange()).with(ORIGINAL_QUEUE);
}
@Bean
public Queue parkingLot() {
return new Queue(PARKING_LOT);
}
}
7.2. 分区目的地
对于分区目标,所有分区都有一个 DLQ。我们从 Headers 中确定原始队列。
7.2.1. republishToDlq=false
When is 时,RabbitMQ 将消息发布到 DLX/DLQ,其标头包含有关原始目标的信息,如以下示例所示:republishToDlq
false
x-death
@SpringBootApplication
public class ReRouteDlqApplication {
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private static final String X_DEATH_HEADER = "x-death";
private static final String X_RETRIES_HEADER = "x-retries";
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
System.out.println("Hit enter to terminate");
System.in.read();
context.close();
}
@Autowired
private RabbitTemplate rabbitTemplate;
@SuppressWarnings("unchecked")
@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
List<Map<String, ?>> xDeath = (List<Map<String, ?>>) headers.get(X_DEATH_HEADER);
String exchange = (String) xDeath.get(0).get("exchange");
List<String> routingKeys = (List<String>) xDeath.get(0).get("routing-keys");
this.rabbitTemplate.send(exchange, routingKeys.get(0), failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
@Bean
public Queue parkingLot() {
return new Queue(PARKING_LOT);
}
}
7.2.2. republishToDlq=true
When is ,重新发布 recoverer 会将原始 exchange 和 routing key 添加到 headers,如以下示例所示:republishToDlq
true
@SpringBootApplication
public class ReRouteDlqApplication {
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private static final String X_RETRIES_HEADER = "x-retries";
private static final String X_ORIGINAL_EXCHANGE_HEADER = RepublishMessageRecoverer.X_ORIGINAL_EXCHANGE;
private static final String X_ORIGINAL_ROUTING_KEY_HEADER = RepublishMessageRecoverer.X_ORIGINAL_ROUTING_KEY;
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
System.out.println("Hit enter to terminate");
System.in.read();
context.close();
}
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
String exchange = (String) headers.get(X_ORIGINAL_EXCHANGE_HEADER);
String originalRoutingKey = (String) headers.get(X_ORIGINAL_ROUTING_KEY_HEADER);
this.rabbitTemplate.send(exchange, originalRoutingKey, failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
@Bean
public Queue parkingLot() {
return new Queue(PARKING_LOT);
}
}
8. 使用 RabbitMQ Binder 进行分区
RabbitMQ 本身不支持分区。
有时,将数据发送到特定分区是有利的 — 例如,当您想要严格排序消息处理时,特定客户的所有消息都应发送到同一分区。
通过将每个分区的队列绑定到目标 Exchange 来提供分区。RabbitMessageChannelBinder
以下 Java 和 YAML 示例展示了如何配置 producer:
@SpringBootApplication
@EnableBinding(Source.class)
public class RabbitPartitionProducerApplication {
private static final Random RANDOM = new Random(System.currentTimeMillis());
private static final String[] data = new String[] {
"abc1", "def1", "qux1",
"abc2", "def2", "qux2",
"abc3", "def3", "qux3",
"abc4", "def4", "qux4",
};
public static void main(String[] args) {
new SpringApplicationBuilder(RabbitPartitionProducerApplication.class)
.web(false)
.run(args);
}
@InboundChannelAdapter(channel = Source.OUTPUT, poller = @Poller(fixedRate = "5000"))
public Message<?> generate() {
String value = data[RANDOM.nextInt(data.length)];
System.out.println("Sending: " + value);
return MessageBuilder.withPayload(value)
.setHeader("partitionKey", value)
.build();
}
}
spring:
cloud:
stream:
bindings:
output:
destination: partitioned.destination
producer:
partitioned: true
partition-key-expression: headers['partitionKey']
partition-count: 2
required-groups:
- myGroup
前面示例中的配置使用默认的分区 ()。
这可能会也可能不会提供适当平衡的算法,具体取决于键值。
您可以使用 or 属性覆盖此默认值。 仅当您需要在部署创建者时预置使用者队列时,该属性才是必需的。
否则,发送到分区的任何消息都将丢失,直到部署相应的使用者为止。 |
以下配置预置主题交换:
以下队列绑定到该 Exchange:
以下绑定将队列关联到交换:
以下 Java 和 YAML 示例延续了前面的示例,并展示了如何配置使用者:
@SpringBootApplication
@EnableBinding(Sink.class)
public class RabbitPartitionConsumerApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(RabbitPartitionConsumerApplication.class)
.web(false)
.run(args);
}
@StreamListener(Sink.INPUT)
public void listen(@Payload String in, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
System.out.println(in + " received from queue " + queue);
}
}
spring:
cloud:
stream:
bindings:
input:
destination: partitioned.destination
group: myGroup
consumer:
partitioned: true
instance-index: 0
不支持动态扩展。
每个分区必须至少有一个使用者。
consumer's 用于指示使用的分区。
Cloud Foundry 等平台只能有一个实例具有 .RabbitMessageChannelBinder instanceIndex instanceIndex |