参考指南

本指南介绍了 Spring Cloud Stream Binder 的 RabbitMQ 实现。 它包含有关其设计、使用和配置选项的信息,以及有关 Stream Cloud Stream 概念如何映射到 RabbitMQ 特定构造的信息。

1. 用途

要使用 RabbitMQ Binders,可以使用以下 Maven 坐标将其添加到 Spring Cloud Stream 应用程序中:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

或者,您可以使用 Spring Cloud Stream RabbitMQ Starter,如下所示:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

2. RabbitMQ Binder 概述

以下简化图显示了 RabbitMQ Binder 的运行方式:

rabbit binder
图 1.RabbitMQ 活页夹

默认情况下,RabbitMQ Binder 实现将每个目标映射到TopicExchange. 对于每个使用者组,一个Queue绑定到TopicExchange. 每个 Consumer 实例都有对应的 RabbitMQConsumerinstance 为其组的Queue. 对于分区的创建者和使用者,队列以分区索引为后缀,并使用分区索引作为路由键。 对于匿名使用者(没有group属性),则使用自动删除队列(具有随机的唯一名称)。

通过使用可选的autoBindDlq选项中,您可以配置 Binder 以创建和配置死信队列 (DLQ)(和死信交换DLX以及路由基础结构)。 默认情况下,死信队列具有目标的名称,并附加.dlq. 如果启用了重试 (maxAttempts > 1),失败的消息将在重试用尽后传送到 DLQ。 如果禁用重试 (maxAttempts = 1),您应该设置requeueRejectedfalse(默认值),以便将失败的消息路由到 DLQ,而不是重新排队。 另外republishToDlq使 Binder 将失败的消息发布到 DLQ(而不是拒绝它)。 此功能允许其他信息(例如x-exception-stacktraceheader) 添加到 Headers 中的消息中。 请参阅frameMaxHeadroom财产了解有关截断堆栈跟踪的信息。 此选项不需要启用 retry。 您只需尝试一次即可重新发布失败的消息。 从版本 1.2 开始,您可以配置重新发布的消息的传递模式。 请参阅republishDeliveryMode财产.

如果流侦听器抛出ImmediateAcknowledgeAmqpException,则绕过 DLQ,并简单地丢弃消息。 从版本 2.1 开始,无论republishToDlq;以前,只有当republishToDlqfalse.

设置requeueRejectedtrue(使用republishToDlq=false) 导致消息重新排队并不断重新传送,这可能不是您想要的,除非失败的原因是暂时的。 通常,您应该通过在 Binder 中设置maxAttempts设置为大于 1 或通过设置republishToDlqtrue.

有关这些属性的更多信息,请参阅 RabbitMQ Binder 属性

该框架不提供任何标准机制来使用死信消息(或将它们重新路由回主队列)。 死信队列处理中介绍了一些选项。

当在 Spring Cloud Stream 应用程序中使用多个 RabbitMQ 绑定器时,禁用“RabbitAutoConfiguration”以避免相同的配置非常重要RabbitAutoConfiguration应用于两个活页夹。 您可以使用@SpringBootApplication注解。

从版本 2.0 开始,RabbitMessageChannelBinderRabbitTemplate.userPublisherConnectionproperty 设置为true这样,非事务性创建者就可以避免使用者上出现死锁,如果缓存的连接由于代理上的内存警报而被阻止,则可能会发生这种情况。

目前,multiplexConsumer(单个使用者侦听多个队列)仅支持消息驱动的使用者;轮询的使用者只能从单个队列中检索消息。

3. 配置选项

本节包含特定于 RabbitMQ Binder 和绑定通道的设置。

有关常规绑定配置选项和属性,请参阅 Spring Cloud Stream 核心文档

3.1. RabbitMQ Binder 属性

默认情况下,RabbitMQ Binder 使用 Spring Boot 的ConnectionFactory. 同时,它支持 RabbitMQ 的所有 Spring Boot 配置选项。 (有关参考,请参阅 Spring Boot 文档)。 RabbitMQ 配置选项使用spring.rabbitmq前缀。

除了 Spring Boot 选项之外,RabbitMQ Binder 还支持以下属性:

spring.cloud.stream.rabbit.binder.admin地址

以逗号分隔的 RabbitMQ 管理插件 URL 列表。 仅在nodes包含多个条目。 此列表中的每个条目都必须在spring.rabbitmq.addresses. 仅当使用 RabbitMQ 集群并希望从托管队列的节点使用时才需要。 有关更多信息,请参见队列关联和 LocalizedQueueConnectionFactory

默认值:空。

spring.cloud.stream.rabbit.binder.nodes

以逗号分隔的 RabbitMQ 节点名称列表。 当有多个条目时,用于查找队列所在的服务器地址。 此列表中的每个条目都必须在spring.rabbitmq.addresses. 仅当使用 RabbitMQ 集群并希望从托管队列的节点使用时才需要。 有关更多信息,请参见队列关联和 LocalizedQueueConnectionFactory

默认值:空。

spring.cloud.stream.rabbit.binder.compressionLevel

压缩绑定的压缩级别。 看java.util.zip.Deflater.

违约:1(BEST_LEVEL)。

spring.cloud.stream.binder.connection-name-prefix

用于命名此 Binder 创建的连接的连接名称前缀。 名称是这个前缀,后跟#n哪里n每次打开新连接时递增。

默认值:none (Spring AMQP 默认值)。

3.2. RabbitMQ Consumer 属性

以下属性仅适用于 Rabbit 使用者,并且必须以spring.cloud.stream.rabbit.bindings.<channelName>.consumer..

但是,如果需要将同一组属性应用于大多数绑定,则 避免重复,Spring Cloud Stream 支持为所有通道设置值, 格式为spring.cloud.stream.rabbit.default.<property>=<value>.

此外,请记住,绑定特定属性将覆盖其在默认值中的等效属性。

acknowledgeMode

确认模式。

违约:AUTO.

anonymousGroupPrefix 的

当 binding 没有groupproperty,则会将匿名的自动删除队列绑定到目标 Exchange。 此类队列的默认命名策略会导致一个名为anonymous.<base64 representation of a UUID>. 设置此属性可将前缀更改为默认值以外的其他名称。

违约:anonymous..

autoBindDlq

是否自动声明 DLQ 并将其绑定到 Binder DLX。

违约:false.

bindingRoutingKey

用于将队列绑定到 exchange 的路由密钥(如果bindQueuetrue). 可以是多个键 - 请参阅bindingRoutingKeyDelimiter. 对于分区目标,-<instanceIndex>附加到每个键。

违约:。#

bindingRoutingKeyDelimiter

当 this 不为 null 时,'bindingRoutingKey' 被视为由此值分隔的键列表;通常使用逗号。

违约:null.

bindQueue (绑定队列)

是否声明队列并将其绑定到目标 Exchange。 将其设置为false如果您已设置自己的基础设施,并且之前已创建并绑定队列。

违约:true.

consumerTag前缀

用于创建 Consumer 标签;将附加为#n哪里n创建的每个使用者的增量。 例:${spring.application.name}-${spring.cloud.stream.bindings.input.group}-${spring.cloud.stream.instance-index}.

默认值:none - 代理将生成随机的消费者标签。

containerType 容器类型

选择要使用的侦听器容器类型。 有关更多信息,请参见 Spring AMQP 文档中的 选择容器

违约:simple

deadLetterQueueName (死信队列名称)

DLQ 的名称

违约:prefix+destination.dlq

死信交换

要分配给队列的 DLX。 仅在以下情况下相关autoBindDlqtrue.

默认值: 'prefix+DLX'

deadLetterExchange类型

要分配给队列的 DLX 的类型。 仅在以下情况下相关autoBindDlqtrue.

默认值: 'direct'

deadLetterRoutingKey

要分配给队列的死信路由键。 仅在以下情况下相关autoBindDlqtrue.

违约:destination

声明 Dlx

是否声明目标的死信交换。 仅在以下情况下相关autoBindDlqtrue. 设置为false如果您有预配置的 DLX.

违约:true.

declareExchange

是否声明目标的交换。

违约:true.

delayedExchange (延迟交换)

是否将交易所声明为Delayed Message Exchange. 需要 broker 上的 delayed message exchange 插件。 这x-delayed-type参数设置为exchangeType.

违约:false.

dlqBindingArguments

将 dlq 绑定到死信交换时应用的参数;用于headers deadLetterExchangeType指定要匹配的标头。 例如…​dlqBindingArguments.x-match=any,…​dlqBindingArguments.someHeader=someValue.

默认值:空

dlqDeadLetterExchange

如果声明了 DLQ,则为要分配给该队列的 DLX。

违约:none

dlqDeadLetterRoutingKey

如果声明了 DLQ,则为该队列分配一个死信路由键。

违约:none

dlq过期

删除未使用的死信队列之前的时间(以毫秒为单位)。

违约:no expiration

dlq懒惰

使用x-queue-mode=lazy论点。 请参见“延迟队列”。 请考虑使用策略而不是此设置,因为使用策略允许在不删除队列的情况下更改设置。

违约:false.

dlqMaxLength

死信队列中的最大消息数。

违约:no limit

dlqMaxLengthBytes

来自所有消息的死信队列中的最大总字节数。

违约:no limit

dlqMaxPriority

死信队列中消息的最大优先级 (0-255)。

违约:none

dlqOverflowBehavior 函数

在以下情况下要执行的作dlqMaxLengthdlqMaxLengthBytes超出;现在drop-headreject-publish但请参阅 RabbitMQ 文档。

违约:none

dlqQuorum.deliveryLimit

什么时候quorum.enabled=true,设置一个传递限制,超过该限制后,消息将被丢弃或死信。

Default: none - 将应用代理默认值。

dlqQuorum.enabled 已启用

如果为 true,则创建 quorum 死信队列,而不是 Classic 队列。

默认值:false

dlqQuorum.initialQuorumSize

什么时候quorum.enabled=true中,设置初始仲裁大小。

Default: none - 将应用代理默认值。

dlqSingleActiveConsumer

设置为 true 可将x-single-active-consumerqueue 属性设置为 true。

违约:false

dlqTtl

声明时应用于死信队列的默认生存时间(以毫秒为单位)。

违约:no limit

durableSubscription (耐用订阅)

订阅是否应为持久订阅。 仅在以下情况下有效group

违约:true.

exchange自动删除

如果declareExchange为 true,则是否应自动删除 Exchange(即,在删除最后一个队列后删除)。

违约:true.

交换耐用

如果declareExchange为 true,则 Exchange 是否应为 durable(即,它在代理重启后继续存在)。

违约:true.

exchangeType

交易所类型:direct,fanout,headerstopic对于非分区目标和direct、headers 或topic对于分区目标。

违约:topic.

独家

是否创建独占消费者。 当 Concurrency 为 1 时true. 通常在需要严格排序时使用,但允许热备用实例在发生故障后接管。 看recoveryInterval,它控制备用实例尝试使用的频率。 考虑使用singleActiveConsumer而是在使用 RabbitMQ 3.8 或更高版本时。

违约:false.

到期

删除未使用的队列之前的时间(以毫秒为单位)。

违约:no expiration

failedDeclarationRetryInterval

如果队列缺失,则尝试从队列中使用之间的间隔(以毫秒为单位)。

默认值:5000

frameMaxHeadroom

将堆栈跟踪添加到 DLQ 消息头时要为其他标头保留的字节数。 所有标头都必须适合frame_maxsize 配置。 堆栈跟踪可能很大;如果 size 加上 this 属性超过frame_max则堆栈跟踪将被截断。 将写入 WARN 日志;考虑增加frame_max或者通过捕获异常并引发具有较小堆栈跟踪的异常来减少堆栈跟踪。

默认值:20000

headerPatterns 的

要从入站消息映射的标头的模式。

默认值:(所有标头)。['*']

懒惰

使用x-queue-mode=lazy论点。 请参见“延迟队列”。 请考虑使用策略而不是此设置,因为使用策略允许在不删除队列的情况下更改设置。

违约:false.

max并发

消费者的最大数量。 不支持containerTypedirect.

maxLength

队列中的最大消息数。

违约:no limit

maxLengthBytes

队列中所有消息的最大总字节数。

违约:no limit

最大优先级

队列中消息的最大优先级 (0-255)。

违约:none

missingQueues致命

当找不到队列时,是否将该条件视为 fatal 并停止监听器容器。 默认为false以便容器不断尝试从队列中使用 — 例如,当使用集群并且托管非 HA 队列的节点关闭时。

违约:false

overflowBehavior

在以下情况下要执行的作maxLengthmaxLengthBytes超出;现在drop-headreject-publish但请参阅 RabbitMQ 文档。

违约:none

预取

预取计数。

前缀

要添加到destination和队列。

默认值: “”。

queueBindingArguments

将队列绑定到 exchange 时应用的参数;用于headers exchangeType指定要匹配的标头。 例如…​queueBindingArguments.x-match=any,…​queueBindingArguments.someHeader=someValue.

默认值:空

queueDeclarationRetries

如果队列缺失,则重试从队列中消费的次数。 仅当missingQueuesFataltrue. 否则,容器将无限期地重试。 不支持containerTypedirect.

queueNameGroupOnly

如果为 true,则从名称等于group. 否则,队列名称为destination.group. 例如,当使用 Spring Cloud Stream 从现有 RabbitMQ 队列中使用时,这非常有用。

默认值:false。

quorum.deliveryLimit

什么时候quorum.enabled=true,设置一个传递限制,超过该限制后,消息将被丢弃或死信。

Default: none - 将应用代理默认值。

法定人数已启用

如果为 true,则创建 quorum 队列而不是 Classic 队列。

默认值:false

quorum.initialQuorumSize

什么时候quorum.enabled=true中,设置初始仲裁大小。

Default: none - 将应用代理默认值。

recoveryInterval 的

连接恢复尝试之间的间隔(以毫秒为单位)。

违约:5000.

requeueRejected 的

禁用重试时是否应将投放失败重新排队,或者republishToDlqfalse.

违约:false.

republishDeliveryMode

什么时候republishToDlqtrue指定重新发布的消息的传递模式。

违约:DeliveryMode.PERSISTENT

republishToDlq

默认情况下,在重试用尽后失败的消息将被拒绝。 如果配置了死信队列 (DLQ),则 RabbitMQ 会将失败的消息(未更改)路由到 DLQ。 如果设置为true,Binder 会将失败的消息重新发布到 DLQ,其中包含其他标头,包括异常消息和来自最终失败原因的堆栈跟踪。 另请参阅 frameMaxHeadroom 属性

默认值:false

singleActiveConsumer

设置为 true 可将x-single-active-consumerqueue 属性设置为 true。

违约:false

交易

是否使用事务处理通道。

违约:false.

ttl

声明时应用于队列的默认生存时间(以毫秒为单位)。

违约:no limit

txSize (tx大小)

ack 之间的投放数。 不支持containerTypedirect.

3.3. 高级侦听器容器配置

要设置未作为 Binder 或绑定属性公开的侦听器容器属性,请添加ListenerContainerCustomizer添加到应用程序上下文中。 将设置 Binder 和 binding 属性,然后调用定制器。 定制器 (configure()method) 提供队列名称和 Consumer Group 作为参数。

3.4. 高级队列/交换/绑定配置

RabbitMQ 团队会不时添加新功能,这些功能通过在声明队列等时设置一些参数来启用。 通常,此类功能是通过添加适当的属性在 Binder 中启用的,但这在当前版本中可能无法立即使用。 从版本 3.0.1 开始,您现在可以添加DeclarableCustomizerbean 添加到应用程序上下文中以修改Declarable (Queue,ExchangeBinding) 执行声明。 这允许您添加 Binder 当前不直接支持的参数。

3.5. 接收批量消息

通常,如果 producer 绑定具有batch-enabled=true(请参阅 Rabbit Producer 属性),或者消息由BatchingRabbitTemplate,则批处理的元素将作为对 listener 方法的单独调用返回。 从版本 3.0 开始,任何此类批处理都可以显示为List<?>如果spring.cloud.stream.bindings.<name>.consumer.batch-mode设置为true.

3.6. Rabbit Producer 属性

以下属性仅适用于 Rabbit 生产者,并且必须以spring.cloud.stream.rabbit.bindings.<channelName>.producer..

但是,如果需要将同一组属性应用于大多数绑定,则 避免重复,Spring Cloud Stream 支持为所有通道设置值, 格式为spring.cloud.stream.rabbit.default.<property>=<value>.

此外,请记住,绑定特定属性将覆盖其在默认值中的等效属性。

autoBindDlq

是否自动声明 DLQ 并将其绑定到 Binder DLX。

违约:false.

batching已启用

是否启用生产者消息批处理。 根据以下属性(在此列表中的以下三个条目中描述),消息将批处理为一条消息:'batchSize'、batchBufferLimitbatchTimeout. 有关更多信息,请参阅 批处理 。 另请参阅接收 Batched Messages

违约:false.

批量大小

启用批处理时要缓冲的消息数。

违约:100.

batchBufferLimit

启用批处理时的最大缓冲区大小。

违约:10000.

batch超时

启用批处理时的批处理超时。

违约:5000.

bindingRoutingKey

用于将队列绑定到 exchange 的路由密钥(如果bindQueuetrue). 可以是多个键 - 请参阅bindingRoutingKeyDelimiter. 对于分区目标,-n附加到每个键。 仅适用于requiredGroups,然后仅提供给这些组。

违约:。#

bindingRoutingKeyDelimiter

当 this 不为 null 时,'bindingRoutingKey' 被视为由此值分隔的键列表;通常使用逗号。 仅适用于requiredGroups,然后仅提供给这些组。

违约:null.

bindQueue (绑定队列)

是否声明队列并将其绑定到目标 Exchange。 将其设置为false如果您已设置自己的基础设施,并且之前已创建并绑定队列。 仅适用于requiredGroups,然后仅提供给这些组。

违约:true.

压缩

发送时是否应压缩数据。

违约:false.

confirmAckChannel 确认

什么时候errorChannelEnabled为 true,则是向其发送肯定投放确认的渠道(也称为 Publisher confirms)。 如果通道不存在,则DirectChannel将使用此名称注册。 必须将连接工厂配置为启用发布者确认。

违约:nullChannel(丢弃 ACK)。

deadLetterQueueName (死信队列名称)

DLQ 的名称 仅适用于requiredGroups,然后仅提供给这些组。

违约:prefix+destination.dlq

死信交换

要分配给队列的 DLX。 仅当autoBindDlqtrue. 仅在以下情况下适用requiredGroups,然后仅提供给这些组。

默认值: 'prefix+DLX'

deadLetterExchange类型

要分配给队列的 DLX 的类型。 仅在以下情况下相关autoBindDlqtrue. 仅在以下情况下适用requiredGroups,然后仅提供给这些组。

默认值: 'direct'

deadLetterRoutingKey

要分配给队列的死信路由键。 仅当autoBindDlqtrue. 仅在以下情况下适用requiredGroups,然后仅提供给这些组。

违约:destination

声明 Dlx

是否声明目标的死信交换。 仅在以下情况下相关autoBindDlqtrue. 设置为false如果您有预配置的 DLX. 仅在以下情况下适用requiredGroups,然后仅提供给这些组。

违约:true.

declareExchange

是否声明目标的交换。

违约:true.

delay表达式

一个 SPEL 表达式,用于评估要应用于消息的延迟 (x-delay标头)。 如果交换不是延迟消息交换,则它不起作用。

默认值:否x-delayheader 的

delayedExchange (延迟交换)

是否将交易所声明为Delayed Message Exchange. 需要 broker 上的 delayed message exchange 插件。 这x-delayed-type参数设置为exchangeType.

违约:false.

deliveryMode 交付模式

交货模式。

违约:PERSISTENT.

dlqBindingArguments

将 dlq 绑定到死信交换时应用的参数;用于headers deadLetterExchangeType指定要匹配的标头。 例如…​dlqBindingArguments.x-match=any,…​dlqBindingArguments.someHeader=someValue. 仅在以下情况下适用requiredGroups,然后仅提供给这些组。

默认值:空

dlqDeadLetterExchange

声明 DLQ 时,要分配给该队列的 DLX。 仅在以下情况下适用requiredGroups,然后仅提供给这些组。

违约:none

dlqDeadLetterRoutingKey

声明 DLQ 时,要分配给该队列的死信路由键。 仅在以下情况下适用requiredGroups,然后仅提供给这些组。

违约:none

dlq过期

删除未使用的死信队列之前的时间(以毫秒为单位)。 仅在以下情况下适用requiredGroups,然后仅提供给这些组。

违约:no expiration

dlq懒惰

使用x-queue-mode=lazy论点。 请参见“延迟队列”。 请考虑使用策略而不是此设置,因为使用策略允许在不删除队列的情况下更改设置。 仅在以下情况下适用requiredGroups,然后仅提供给这些组。

dlqMaxLength

死信队列中的最大消息数。 仅在以下情况下适用requiredGroups,然后仅提供给这些组。

违约:no limit

dlqMaxLengthBytes

来自所有消息的死信队列中的最大总字节数。 仅在以下情况下适用requiredGroups,然后仅提供给这些组。

违约:no limit

dlqMaxPriority

死信队列中消息的最大优先级 (0-255) 仅在以下情况下适用requiredGroups,然后仅提供给这些组。

违约:none

dlqQuorum.deliveryLimit

什么时候quorum.enabled=true,设置一个传递限制,超过该限制后,消息将被丢弃或死信。 仅在以下情况下适用requiredGroups,然后仅提供给这些组。

Default: none - 将应用代理默认值。

dlqQuorum.enabled 已启用

如果为 true,则创建 quorum 死信队列,而不是 Classic 队列。 仅在以下情况下适用requiredGroups,然后仅提供给这些组。

默认值:false

dlqQuorum.initialQuorumSize

什么时候quorum.enabled=true中,设置初始仲裁大小。 仅在以下情况下适用requiredGroups,然后仅提供给这些组。

Default: none - 将应用代理默认值。

dlqSingleActiveConsumer

设置为 true 可将x-single-active-consumerqueue 属性设置为 true。 仅在以下情况下适用requiredGroups,然后仅提供给这些组。

违约:false

dlqTtl

声明时应用于死信队列的默认生存时间(以毫秒为单位)。 仅在以下情况下适用requiredGroups,然后仅提供给这些组。

违约:no limit

exchange自动删除

如果declareExchangetrue,是否应自动删除交换(在删除最后一个队列后将其删除)。

违约:true.

交换耐用

如果declareExchangetrue,则 Exchange 是否应为 Durable (在代理重启后继续存在)。

违约:true.

exchangeType

交易所类型:direct,fanout,headerstopic对于非分区目标和direct,headerstopic对于分区目标。

违约:topic.

到期

删除未使用的队列之前的时间 (以毫秒为单位)。 仅在以下情况下适用requiredGroups,然后仅提供给这些组。

违约:no expiration

headerPatterns 的

要映射到出站消息的标头的模式。

默认值:(所有标头)。['*']

懒惰

使用x-queue-mode=lazy论点。 请参见“延迟队列”。 请考虑使用策略而不是此设置,因为使用策略允许在不删除队列的情况下更改设置。 仅在以下情况下适用requiredGroups,然后仅提供给这些组。

违约:false.

maxLength

队列中的最大消息数。 仅在以下情况下适用requiredGroups,然后仅提供给这些组。

违约:no limit

maxLengthBytes

队列中所有消息的最大总字节数。 仅适用于requiredGroups,然后仅提供给这些组。

违约:no limit

最大优先级

队列中消息的最大优先级 (0-255)。 仅适用于requiredGroups,然后仅提供给这些组。

违约:none

前缀

要添加到destination交换。

默认值: “”。

queueBindingArguments

将队列绑定到 exchange 时应用的参数;用于headers exchangeType指定要匹配的标头。 例如…​queueBindingArguments.x-match=any,…​queueBindingArguments.someHeader=someValue. 仅在以下情况下适用requiredGroups,然后仅提供给这些组。

默认值:空

queueNameGroupOnly

什么时候true,从名称等于group. 否则,队列名称为destination.group. 例如,当使用 Spring Cloud Stream 从现有 RabbitMQ 队列中使用时,这非常有用。 仅在以下情况下适用requiredGroups,然后仅提供给这些组。

默认值:false。

quorum.deliveryLimit

什么时候quorum.enabled=true,设置一个传递限制,超过该限制后,消息将被丢弃或死信。 仅在以下情况下适用requiredGroups,然后仅提供给这些组。

Default: none - 将应用代理默认值。

法定人数已启用

如果为 true,则创建 quorum 队列而不是 Classic 队列。 仅在以下情况下适用requiredGroups,然后仅提供给这些组。

默认值:false

quorum.initialQuorumSize

什么时候quorum.enabled=true中,设置初始仲裁大小。 仅在以下情况下适用requiredGroups,然后仅提供给这些组。

Default: none - 将应用代理默认值。

routingKeyExpression 的

一个 SPEL 表达式,用于确定发布消息时要使用的路由键。 对于固定路由密钥,请使用文本表达式,例如routingKeyExpression='my.routingKey'在属性文件中或routingKeyExpression: '''my.routingKey'''在 YAML 文件中。

违约:destinationdestination-<partition>对于分区目标。

singleActiveConsumer

设置为 true 可将x-single-active-consumerqueue 属性设置为 true。 仅在以下情况下适用requiredGroups,然后仅提供给这些组。

违约:false

交易

是否使用事务处理通道。

违约:false.

ttl

声明时应用于队列的默认生存时间(以毫秒为单位)。 仅在以下情况下适用requiredGroups,然后仅提供给这些组。

违约:no limit

对于 RabbitMQ,内容类型标头可以由外部应用程序设置。 Spring Cloud Stream 支持将它们作为扩展内部协议的一部分,用于任何类型的传输,包括传输,例如 Kafka(0.11 之前),它们本身不支持 headers。

4. 使用现有队列/交易所

默认情况下,Binder 将自动配置主题交换,其名称派生自 destination binding 属性的值<prefix><destination>. 如果未提供,则 destination 默认为绑定名称。 绑定 Consumer 时,队列会自动预置名称<prefix><destination>.<group>(如果groupbinding 属性)或匿名自动删除队列(如果没有group. 队列将使用 “match-all” 通配符路由键 () 绑定到交换,以实现非分区绑定,或者#<destination>-<instanceIndex>对于分区绑定。 前缀为空String默认情况下。 如果使用requiredGroups,将为每个组配置一个队列/绑定。

有许多特定于 rabbit 的绑定属性允许您修改此默认行为。

如果您有要使用的现有 Exchange/队列,则可以按如下方式完全禁用自动供应,假设该 Exchange 名为myExchange队列名为myQueue:

如果您希望 Binders 预置 queue/exchange,但您希望使用此处讨论的默认值以外的其他功能来执行此作,请使用以下属性。 有关更多信息,请参阅上面的属性文档。

  • spring.cloud.stream.rabbit.bindings.<binding name>.consumer.bindingRoutingKey=myRoutingKey

  • spring.cloud.stream.rabbit.bindings.<binding name>.consumer.exchangeType=<type>

  • spring.cloud.stream.rabbit.bindings.<binding name>.producer.routingKeyExpression='myRoutingKey'

在声明死信交换/队列时,也有类似的属性,当autoBindDlqtrue.

5. 使用 RabbitMQ Binder 重试

在 Binder 中启用重试后,侦听器容器线程将在配置的任何回退期间暂停。 当单个使用者需要严格排序时,这可能很重要。但是,对于其他使用案例,它会阻止在该线程上处理其他消息。 使用 Binder 重试的另一种方法是在死信队列 (DLQ) 上设置具有生存时间的死信,并在 DLQ 本身上设置死信配置。 有关此处讨论的属性的更多信息,请参阅“RabbitMQ Binder 属性”。 您可以使用以下示例配置来启用此功能:

  • 设置autoBindDlqtrue. Binder 将创建一个 DLQ。 (可选)您可以在deadLetterQueueName.

  • 设置dlqTtl设置为您希望在重新投递之间等待的 back off 时间。

  • dlqDeadLetterExchange添加到默认交易所。 来自 DLQ 的过期邮件将路由到原始队列,因为默认的deadLetterRoutingKey是队列名称 (destination.group). 设置为默认 exchange 是通过设置无值的属性来实现的,如下一个示例所示。

要强制消息为死信,请抛出AmqpRejectAndDontRequeueException或设置requeueRejectedtrue(默认值)并引发任何异常。

循环无休止地继续,这对于暂时性问题来说很好,但你可能想在尝试几次后放弃。 幸运的是,RabbitMQ 提供了x-death标头,用于确定已发生的周期数。

要在放弃后确认消息,请抛出一个ImmediateAcknowledgeAmqpException.

5.1. 把它们放在一起

以下配置将创建一个 ExchangemyDestination带队列myDestination.consumerGroup绑定到具有通配符路由密钥的主题交换:#

---
spring.cloud.stream.bindings.input.destination=myDestination
spring.cloud.stream.bindings.input.group=consumerGroup
#disable binder retries
spring.cloud.stream.bindings.input.consumer.max-attempts=1
#dlx/dlq setup
spring.cloud.stream.rabbit.bindings.input.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-ttl=5000
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-dead-letter-exchange=
---

此配置将创建一个绑定到直接交换 (DLX),其中路由密钥为myDestination.consumerGroup. 当邮件被拒绝时,它们将被路由到 DLQ。 5 秒后,消息过期,并使用队列名称作为路由键路由到原始队列,如以下示例所示:

Spring Boot 应用程序
@SpringBootApplication
@EnableBinding(Sink.class)
public class XDeathApplication {

    public static void main(String[] args) {
        SpringApplication.run(XDeathApplication.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void listen(String in, @Header(name = "x-death", required = false) Map<?,?> death) {
        if (death != null && death.get("count").equals(3L)) {
            // giving up - don't send to DLX
            throw new ImmediateAcknowledgeAmqpException("Failed after 4 attempts");
        }
        throw new AmqpRejectAndDontRequeueException("failed");
    }

}

请注意,x-deathheader 是一个Long.

6. 错误通道

从版本 1.3 开始,Binder 无条件地将异常发送到每个 Consumer 目标的错误通道,并且还可以配置为将异步 producer send failures发送到 error 通道。 有关更多信息,请参见“[spring-cloud-stream-overview-error-handling]”。

RabbitMQ 有两种类型的发送失败:

后者很少见。 根据 RabbitMQ 文档,“[A nack] 只有在负责队列的 Erlang 进程中发生内部错误时才会交付。

除了启用生产者错误通道(如“[spring-cloud-stream-overview-error-handling]”中所述)之外,RabbitMQ Binder 仅在连接工厂配置适当的情况下才会向通道发送消息,如下所示:

将 Spring Boot 配置用于连接工厂时,请设置以下属性:

ErrorMessage对于返回的消息,是一个ReturnedAmqpMessageException具有以下属性:

对于否定确认的确认,有效负载是NackedAmqpMessageException具有以下属性:

不会自动处理这些异常(例如发送到死信队列)。 您可以在自己的 Spring 集成流中使用这些异常。

7. 死信队列处理

由于您无法预测用户希望如何处理死信消息,因此框架不提供任何标准机制来处理它们。 如果死信的原因是暂时的,您可能希望将消息路由回原始队列。 但是,如果问题是永久性问题,则可能会导致无限循环。 Spring 下面的 Boot 应用程序展示了如何将这些消息路由回原始队列,但在三次尝试后将它们移动到第三个“停车场”队列的示例。 第二个示例使用 RabbitMQ Delayed Message Exchange 为重新排队的消息引入延迟。 在此示例中,每次尝试的延迟都会增加。 这些示例使用@RabbitListener以接收来自 DLQ 的消息。 您还可以使用RabbitTemplate.receive()在批处理中。

这些示例假定原始目标是so8400in且 Consumer 组为so8400.

7.1. 未分区的目标

前两个示例适用于目标未分区的情况:

@SpringBootApplication
public class ReRouteDlqApplication {

    private static final String ORIGINAL_QUEUE = "so8400in.so8400";

    private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

    private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

    private static final String X_RETRIES_HEADER = "x-retries";

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
        System.out.println("Hit enter to terminate");
        System.in.read();
        context.close();
    }

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = DLQ)
    public void rePublish(Message failedMessage) {
        Integer retriesHeader = (Integer) failedMessage.getMessageProperties().getHeaders().get(X_RETRIES_HEADER);
        if (retriesHeader == null) {
            retriesHeader = Integer.valueOf(0);
        }
        if (retriesHeader < 3) {
            failedMessage.getMessageProperties().getHeaders().put(X_RETRIES_HEADER, retriesHeader + 1);
            this.rabbitTemplate.send(ORIGINAL_QUEUE, failedMessage);
        }
        else {
            this.rabbitTemplate.send(PARKING_LOT, failedMessage);
        }
    }

    @Bean
    public Queue parkingLot() {
        return new Queue(PARKING_LOT);
    }

}
@SpringBootApplication
public class ReRouteDlqApplication {

    private static final String ORIGINAL_QUEUE = "so8400in.so8400";

    private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

    private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

    private static final String X_RETRIES_HEADER = "x-retries";

    private static final String DELAY_EXCHANGE = "dlqReRouter";

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
        System.out.println("Hit enter to terminate");
        System.in.read();
        context.close();
    }

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = DLQ)
    public void rePublish(Message failedMessage) {
        Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
        Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
        if (retriesHeader == null) {
            retriesHeader = Integer.valueOf(0);
        }
        if (retriesHeader < 3) {
            headers.put(X_RETRIES_HEADER, retriesHeader + 1);
            headers.put("x-delay", 5000 * retriesHeader);
            this.rabbitTemplate.send(DELAY_EXCHANGE, ORIGINAL_QUEUE, failedMessage);
        }
        else {
            this.rabbitTemplate.send(PARKING_LOT, failedMessage);
        }
    }

    @Bean
    public DirectExchange delayExchange() {
        DirectExchange exchange = new DirectExchange(DELAY_EXCHANGE);
        exchange.setDelayed(true);
        return exchange;
    }

    @Bean
    public Binding bindOriginalToDelay() {
        return BindingBuilder.bind(new Queue(ORIGINAL_QUEUE)).to(delayExchange()).with(ORIGINAL_QUEUE);
    }

    @Bean
    public Queue parkingLot() {
        return new Queue(PARKING_LOT);
    }

}

7.2. 分区目的地

对于分区目标,所有分区都有一个 DLQ。我们从 Headers 中确定原始队列。

7.2.1.republishToDlq=false

什么时候republishToDlqfalse时,RabbitMQ 会将消息发布到 DLX/DLQ,其中包含x-death标头,其中包含有关原始目标的信息,如以下示例所示:

@SpringBootApplication
public class ReRouteDlqApplication {

	private static final String ORIGINAL_QUEUE = "so8400in.so8400";

	private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

	private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

	private static final String X_DEATH_HEADER = "x-death";

	private static final String X_RETRIES_HEADER = "x-retries";

	public static void main(String[] args) throws Exception {
		ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
		System.out.println("Hit enter to terminate");
		System.in.read();
		context.close();
	}

	@Autowired
	private RabbitTemplate rabbitTemplate;

	@SuppressWarnings("unchecked")
	@RabbitListener(queues = DLQ)
	public void rePublish(Message failedMessage) {
		Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
		Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
		if (retriesHeader == null) {
			retriesHeader = Integer.valueOf(0);
		}
		if (retriesHeader < 3) {
			headers.put(X_RETRIES_HEADER, retriesHeader + 1);
			List<Map<String, ?>> xDeath = (List<Map<String, ?>>) headers.get(X_DEATH_HEADER);
			String exchange = (String) xDeath.get(0).get("exchange");
			List<String> routingKeys = (List<String>) xDeath.get(0).get("routing-keys");
			this.rabbitTemplate.send(exchange, routingKeys.get(0), failedMessage);
		}
		else {
			this.rabbitTemplate.send(PARKING_LOT, failedMessage);
		}
	}

	@Bean
	public Queue parkingLot() {
		return new Queue(PARKING_LOT);
	}

}

7.2.2.republishToDlq=true

什么时候republishToDlqtrue中,重新发布 Recoverer 会将原始 Exchange 和 Routing 键添加到 Headers,如以下示例所示:

@SpringBootApplication
public class ReRouteDlqApplication {

	private static final String ORIGINAL_QUEUE = "so8400in.so8400";

	private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

	private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

	private static final String X_RETRIES_HEADER = "x-retries";

	private static final String X_ORIGINAL_EXCHANGE_HEADER = RepublishMessageRecoverer.X_ORIGINAL_EXCHANGE;

	private static final String X_ORIGINAL_ROUTING_KEY_HEADER = RepublishMessageRecoverer.X_ORIGINAL_ROUTING_KEY;

	public static void main(String[] args) throws Exception {
		ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
		System.out.println("Hit enter to terminate");
		System.in.read();
		context.close();
	}

	@Autowired
	private RabbitTemplate rabbitTemplate;

	@RabbitListener(queues = DLQ)
	public void rePublish(Message failedMessage) {
		Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
		Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
		if (retriesHeader == null) {
			retriesHeader = Integer.valueOf(0);
		}
		if (retriesHeader < 3) {
			headers.put(X_RETRIES_HEADER, retriesHeader + 1);
			String exchange = (String) headers.get(X_ORIGINAL_EXCHANGE_HEADER);
			String originalRoutingKey = (String) headers.get(X_ORIGINAL_ROUTING_KEY_HEADER);
			this.rabbitTemplate.send(exchange, originalRoutingKey, failedMessage);
		}
		else {
			this.rabbitTemplate.send(PARKING_LOT, failedMessage);
		}
	}

	@Bean
	public Queue parkingLot() {
		return new Queue(PARKING_LOT);
	}

}

8. 使用 RabbitMQ Binder 进行分区

RabbitMQ 本身不支持分区。

有时,将数据发送到特定分区是有利的 — 例如,当您想要严格排序消息处理时,特定客户的所有消息都应发送到同一分区。

RabbitMessageChannelBinder通过将每个分区的队列绑定到目标 Exchange 来提供分区。

以下 Java 和 YAML 示例展示了如何配置 producer:

制作人
@SpringBootApplication
@EnableBinding(Source.class)
public class RabbitPartitionProducerApplication {

    private static final Random RANDOM = new Random(System.currentTimeMillis());

    private static final String[] data = new String[] {
            "abc1", "def1", "qux1",
            "abc2", "def2", "qux2",
            "abc3", "def3", "qux3",
            "abc4", "def4", "qux4",
            };

    public static void main(String[] args) {
        new SpringApplicationBuilder(RabbitPartitionProducerApplication.class)
            .web(false)
            .run(args);
    }

    @InboundChannelAdapter(channel = Source.OUTPUT, poller = @Poller(fixedRate = "5000"))
    public Message<?> generate() {
        String value = data[RANDOM.nextInt(data.length)];
        System.out.println("Sending: " + value);
        return MessageBuilder.withPayload(value)
                .setHeader("partitionKey", value)
                .build();
    }

}
application.yml
    spring:
      cloud:
        stream:
          bindings:
            output:
              destination: partitioned.destination
              producer:
                partitioned: true
                partition-key-expression: headers['partitionKey']
                partition-count: 2
                required-groups:
                - myGroup

前面示例中的配置使用默认分区 (key.hashCode() % partitionCount). 这可能会也可能不会提供适当平衡的算法,具体取决于键值。 您可以使用partitionSelectorExpressionpartitionSelectorClass性能。

required-groups仅当您需要在部署创建者时预置使用者队列时,才需要属性。 否则,发送到分区的任何消息都将丢失,直到部署相应的使用者为止。

以下配置预置主题交换:

零件更换

以下队列绑定到该 Exchange:

Part 队列

以下绑定将队列关联到交换:

零件绑定

以下 Java 和 YAML 示例延续了前面的示例,并展示了如何配置使用者:

消费者
@SpringBootApplication
@EnableBinding(Sink.class)
public class RabbitPartitionConsumerApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(RabbitPartitionConsumerApplication.class)
            .web(false)
            .run(args);
    }

    @StreamListener(Sink.INPUT)
    public void listen(@Payload String in, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
        System.out.println(in + " received from queue " + queue);
    }

}
application.yml
    spring:
      cloud:
        stream:
          bindings:
            input:
              destination: partitioned.destination
              group: myGroup
              consumer:
                partitioned: true
                instance-index: 0
RabbitMessageChannelBinder不支持动态扩展。 每个分区必须至少有一个使用者。 消费者的instanceIndex用于指示消耗的分区。 Cloud Foundry 等平台只能有一个实例,其中instanceIndex.

APP信息