参考指南

本指南介绍了 Spring Cloud Stream Binder 的 RabbitMQ 实现。 它包含有关其设计、使用和配置选项的信息,以及有关 Stream Cloud Stream 概念如何映射到 RabbitMQ 特定构造的信息。spring-doc.cadn.net.cn

1. 用途

要使用 RabbitMQ Binders,可以使用以下 Maven 坐标将其添加到 Spring Cloud Stream 应用程序中:spring-doc.cadn.net.cn

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

或者,您可以使用 Spring Cloud Stream RabbitMQ Starter,如下所示:spring-doc.cadn.net.cn

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

2. RabbitMQ Binder 概述

以下简化图显示了 RabbitMQ Binder 的运行方式:spring-doc.cadn.net.cn

rabbit binder
图 1.RabbitMQ 活页夹

默认情况下,RabbitMQ Binder 实现将每个目标映射到TopicExchange. 对于每个使用者组,一个Queue绑定到TopicExchange. 每个 Consumer 实例都有对应的 RabbitMQConsumerinstance 为其组的Queue. 对于分区的创建者和使用者,队列以分区索引为后缀,并使用分区索引作为路由键。 对于匿名使用者(没有group属性),则使用自动删除队列(具有随机的唯一名称)。spring-doc.cadn.net.cn

通过使用可选的autoBindDlq选项中,您可以配置 Binder 以创建和配置死信队列 (DLQ)(和死信交换DLX以及路由基础结构)。 默认情况下,死信队列具有目标的名称,并附加.dlq. 如果启用了重试 (maxAttempts > 1),失败的消息将在重试用尽后传送到 DLQ。 如果禁用重试 (maxAttempts = 1),您应该设置requeueRejectedfalse(默认值),以便将失败的消息路由到 DLQ,而不是重新排队。 另外republishToDlq使 Binder 将失败的消息发布到 DLQ(而不是拒绝它)。 此功能允许其他信息(例如x-exception-stacktraceheader) 添加到 Headers 中的消息中。 请参阅frameMaxHeadroom财产了解有关截断堆栈跟踪的信息。 此选项不需要启用 retry。 您只需尝试一次即可重新发布失败的消息。 从版本 1.2 开始,您可以配置重新发布的消息的传递模式。 请参阅republishDeliveryMode财产.spring-doc.cadn.net.cn

如果流侦听器抛出ImmediateAcknowledgeAmqpException,则绕过 DLQ,并简单地丢弃消息。 从版本 2.1 开始,无论republishToDlq;以前,只有当republishToDlqfalse.spring-doc.cadn.net.cn

设置requeueRejectedtrue(使用republishToDlq=false) 导致消息重新排队并不断重新传送,这可能不是您想要的,除非失败的原因是暂时的。 通常,您应该通过在 Binder 中设置maxAttempts设置为大于 1 或通过设置republishToDlqtrue.

有关这些属性的更多信息,请参阅 RabbitMQ Binder 属性spring-doc.cadn.net.cn

该框架不提供任何标准机制来使用死信消息(或将它们重新路由回主队列)。 死信队列处理中介绍了一些选项。spring-doc.cadn.net.cn

当在 Spring Cloud Stream 应用程序中使用多个 RabbitMQ 绑定器时,禁用“RabbitAutoConfiguration”以避免相同的配置非常重要RabbitAutoConfiguration应用于两个活页夹。 您可以使用@SpringBootApplication注解。

从版本 2.0 开始,RabbitMessageChannelBinderRabbitTemplate.userPublisherConnectionproperty 设置为true这样,非事务性创建者就可以避免使用者上出现死锁,如果缓存的连接由于代理上的内存警报而被阻止,则可能会发生这种情况。spring-doc.cadn.net.cn

目前,multiplexConsumer(单个使用者侦听多个队列)仅支持消息驱动的使用者;轮询的使用者只能从单个队列中检索消息。

3. 配置选项

本节包含特定于 RabbitMQ Binder 和绑定通道的设置。spring-doc.cadn.net.cn

有关常规绑定配置选项和属性,请参阅 Spring Cloud Stream 核心文档spring-doc.cadn.net.cn

3.1. RabbitMQ Binder 属性

默认情况下,RabbitMQ Binder 使用 Spring Boot 的ConnectionFactory. 同时,它支持 RabbitMQ 的所有 Spring Boot 配置选项。 (有关参考,请参阅 Spring Boot 文档)。 RabbitMQ 配置选项使用spring.rabbitmq前缀。spring-doc.cadn.net.cn

除了 Spring Boot 选项之外,RabbitMQ Binder 还支持以下属性:spring-doc.cadn.net.cn

spring.cloud.stream.rabbit.binder.admin地址

以逗号分隔的 RabbitMQ 管理插件 URL 列表。 仅在nodes包含多个条目。 此列表中的每个条目都必须在spring.rabbitmq.addresses. 仅当使用 RabbitMQ 集群并希望从托管队列的节点使用时才需要。 有关更多信息,请参见队列关联和 LocalizedQueueConnectionFactoryspring-doc.cadn.net.cn

默认值:空。spring-doc.cadn.net.cn

spring.cloud.stream.rabbit.binder.nodes

以逗号分隔的 RabbitMQ 节点名称列表。 当有多个条目时,用于查找队列所在的服务器地址。 此列表中的每个条目都必须在spring.rabbitmq.addresses. 仅当使用 RabbitMQ 集群并希望从托管队列的节点使用时才需要。 有关更多信息,请参见队列关联和 LocalizedQueueConnectionFactoryspring-doc.cadn.net.cn

默认值:空。spring-doc.cadn.net.cn

spring.cloud.stream.rabbit.binder.compressionLevel

压缩绑定的压缩级别。 看java.util.zip.Deflater.spring-doc.cadn.net.cn

违约:1(BEST_LEVEL)。spring-doc.cadn.net.cn

spring.cloud.stream.binder.connection-name-prefix

用于命名此 Binder 创建的连接的连接名称前缀。 名称是这个前缀,后跟#n哪里n每次打开新连接时递增。spring-doc.cadn.net.cn

默认值:none (Spring AMQP 默认值)。spring-doc.cadn.net.cn

3.2. RabbitMQ Consumer 属性

以下属性仅适用于 Rabbit 使用者,并且必须以spring.cloud.stream.rabbit.bindings.<channelName>.consumer..spring-doc.cadn.net.cn

但是,如果需要将同一组属性应用于大多数绑定,则 避免重复,Spring Cloud Stream 支持为所有通道设置值, 格式为spring.cloud.stream.rabbit.default.<property>=<value>.spring-doc.cadn.net.cn

此外,请记住,绑定特定属性将覆盖其在默认值中的等效属性。spring-doc.cadn.net.cn

acknowledgeMode

确认模式。spring-doc.cadn.net.cn

违约:AUTO.spring-doc.cadn.net.cn

anonymousGroupPrefix 的

当 binding 没有groupproperty,则会将匿名的自动删除队列绑定到目标 Exchange。 此类队列的默认命名策略会导致一个名为anonymous.<base64 representation of a UUID>. 设置此属性可将前缀更改为默认值以外的其他名称。spring-doc.cadn.net.cn

违约:anonymous..spring-doc.cadn.net.cn

autoBindDlq

是否自动声明 DLQ 并将其绑定到 Binder DLX。spring-doc.cadn.net.cn

违约:false.spring-doc.cadn.net.cn

bindingRoutingKey

用于将队列绑定到 exchange 的路由密钥(如果bindQueuetrue). 可以是多个键 - 请参阅bindingRoutingKeyDelimiter. 对于分区目标,-<instanceIndex>附加到每个键。spring-doc.cadn.net.cn

违约:。#spring-doc.cadn.net.cn

bindingRoutingKeyDelimiter

当 this 不为 null 时,'bindingRoutingKey' 被视为由此值分隔的键列表;通常使用逗号。spring-doc.cadn.net.cn

违约:null.spring-doc.cadn.net.cn

bindQueue (绑定队列)

是否声明队列并将其绑定到目标 Exchange。 将其设置为false如果您已设置自己的基础设施,并且之前已创建并绑定队列。spring-doc.cadn.net.cn

违约:true.spring-doc.cadn.net.cn

consumerTag前缀

用于创建 Consumer 标签;将附加为#n哪里n创建的每个使用者的增量。 例:${spring.application.name}-${spring.cloud.stream.bindings.input.group}-${spring.cloud.stream.instance-index}.spring-doc.cadn.net.cn

默认值:none - 代理将生成随机的消费者标签。spring-doc.cadn.net.cn

containerType 容器类型

选择要使用的侦听器容器类型。 有关更多信息,请参见 Spring AMQP 文档中的 选择容器spring-doc.cadn.net.cn

违约:simplespring-doc.cadn.net.cn

deadLetterQueueName (死信队列名称)

DLQ 的名称spring-doc.cadn.net.cn

违约:prefix+destination.dlqspring-doc.cadn.net.cn

死信交换

要分配给队列的 DLX。 仅在以下情况下相关autoBindDlqtrue.spring-doc.cadn.net.cn

默认值: 'prefix+DLX'spring-doc.cadn.net.cn

deadLetterExchange类型

要分配给队列的 DLX 的类型。 仅在以下情况下相关autoBindDlqtrue.spring-doc.cadn.net.cn

默认值: 'direct'spring-doc.cadn.net.cn

deadLetterRoutingKey

要分配给队列的死信路由键。 仅在以下情况下相关autoBindDlqtrue.spring-doc.cadn.net.cn

违约:destinationspring-doc.cadn.net.cn

声明 Dlx

是否声明目标的死信交换。 仅在以下情况下相关autoBindDlqtrue. 设置为false如果您有预配置的 DLX.spring-doc.cadn.net.cn

违约:true.spring-doc.cadn.net.cn

declareExchange

是否声明目标的交换。spring-doc.cadn.net.cn

违约:true.spring-doc.cadn.net.cn

delayedExchange (延迟交换)

是否将交易所声明为Delayed Message Exchange. 需要 broker 上的 delayed message exchange 插件。 这x-delayed-type参数设置为exchangeType.spring-doc.cadn.net.cn

违约:false.spring-doc.cadn.net.cn

dlqBindingArguments

将 dlq 绑定到死信交换时应用的参数;用于headers deadLetterExchangeType指定要匹配的标头。 例如…​dlqBindingArguments.x-match=any,…​dlqBindingArguments.someHeader=someValue.spring-doc.cadn.net.cn

默认值:空spring-doc.cadn.net.cn

dlqDeadLetterExchange

如果声明了 DLQ,则为要分配给该队列的 DLX。spring-doc.cadn.net.cn

违约:nonespring-doc.cadn.net.cn

dlqDeadLetterRoutingKey

如果声明了 DLQ,则为该队列分配一个死信路由键。spring-doc.cadn.net.cn

违约:nonespring-doc.cadn.net.cn

dlq过期

删除未使用的死信队列之前的时间(以毫秒为单位)。spring-doc.cadn.net.cn

违约:no expirationspring-doc.cadn.net.cn

dlq懒惰

使用x-queue-mode=lazy论点。 请参见“延迟队列”。 请考虑使用策略而不是此设置,因为使用策略允许在不删除队列的情况下更改设置。spring-doc.cadn.net.cn

违约:false.spring-doc.cadn.net.cn

dlqMaxLength

死信队列中的最大消息数。spring-doc.cadn.net.cn

违约:no limitspring-doc.cadn.net.cn

dlqMaxLengthBytes

来自所有消息的死信队列中的最大总字节数。spring-doc.cadn.net.cn

违约:no limitspring-doc.cadn.net.cn

dlqMaxPriority

死信队列中消息的最大优先级 (0-255)。spring-doc.cadn.net.cn

违约:nonespring-doc.cadn.net.cn

dlqOverflowBehavior 函数

在以下情况下要执行的作dlqMaxLengthdlqMaxLengthBytes超出;现在drop-headreject-publish但请参阅 RabbitMQ 文档。spring-doc.cadn.net.cn

违约:nonespring-doc.cadn.net.cn

dlqQuorum.deliveryLimit

什么时候quorum.enabled=true,设置一个传递限制,超过该限制后,消息将被丢弃或死信。spring-doc.cadn.net.cn

Default: none - 将应用代理默认值。spring-doc.cadn.net.cn

dlqQuorum.enabled 已启用

如果为 true,则创建 quorum 死信队列,而不是 Classic 队列。spring-doc.cadn.net.cn

默认值:falsespring-doc.cadn.net.cn

dlqQuorum.initialQuorumSize

什么时候quorum.enabled=true中,设置初始仲裁大小。spring-doc.cadn.net.cn

Default: none - 将应用代理默认值。spring-doc.cadn.net.cn

dlqSingleActiveConsumer

设置为 true 可将x-single-active-consumerqueue 属性设置为 true。spring-doc.cadn.net.cn

违约:falsespring-doc.cadn.net.cn

dlqTtl

声明时应用于死信队列的默认生存时间(以毫秒为单位)。spring-doc.cadn.net.cn

违约:no limitspring-doc.cadn.net.cn

durableSubscription (耐用订阅)

订阅是否应为持久订阅。 仅在以下情况下有效groupspring-doc.cadn.net.cn

违约:true.spring-doc.cadn.net.cn

exchange自动删除

如果declareExchange为 true,则是否应自动删除 Exchange(即,在删除最后一个队列后删除)。spring-doc.cadn.net.cn

违约:true.spring-doc.cadn.net.cn

交换耐用

如果declareExchange为 true,则 Exchange 是否应为 durable(即,它在代理重启后继续存在)。spring-doc.cadn.net.cn

违约:true.spring-doc.cadn.net.cn

exchangeType

交易所类型:direct,fanout,headerstopic对于非分区目标和direct、headers 或topic对于分区目标。spring-doc.cadn.net.cn

违约:topic.spring-doc.cadn.net.cn

独家

是否创建独占消费者。 当 Concurrency 为 1 时true. 通常在需要严格排序时使用,但允许热备用实例在发生故障后接管。 看recoveryInterval,它控制备用实例尝试使用的频率。 考虑使用singleActiveConsumer而是在使用 RabbitMQ 3.8 或更高版本时。spring-doc.cadn.net.cn

违约:false.spring-doc.cadn.net.cn

到期

删除未使用的队列之前的时间(以毫秒为单位)。spring-doc.cadn.net.cn

违约:no expirationspring-doc.cadn.net.cn

failedDeclarationRetryInterval

如果队列缺失,则尝试从队列中使用之间的间隔(以毫秒为单位)。spring-doc.cadn.net.cn

默认值:5000spring-doc.cadn.net.cn

frameMaxHeadroom

将堆栈跟踪添加到 DLQ 消息头时要为其他标头保留的字节数。 所有标头都必须适合frame_maxsize 配置。 堆栈跟踪可能很大;如果 size 加上 this 属性超过frame_max则堆栈跟踪将被截断。 将写入 WARN 日志;考虑增加frame_max或者通过捕获异常并引发具有较小堆栈跟踪的异常来减少堆栈跟踪。spring-doc.cadn.net.cn

默认值:20000spring-doc.cadn.net.cn

headerPatterns 的

要从入站消息映射的标头的模式。spring-doc.cadn.net.cn

默认值:(所有标头)。['*']spring-doc.cadn.net.cn

懒惰

使用x-queue-mode=lazy论点。 请参见“延迟队列”。 请考虑使用策略而不是此设置,因为使用策略允许在不删除队列的情况下更改设置。spring-doc.cadn.net.cn

违约:false.spring-doc.cadn.net.cn

max并发

消费者的最大数量。 不支持containerTypedirect.spring-doc.cadn.net.cn

maxLength

队列中的最大消息数。spring-doc.cadn.net.cn

违约:no limitspring-doc.cadn.net.cn

maxLengthBytes

队列中所有消息的最大总字节数。spring-doc.cadn.net.cn

违约:no limitspring-doc.cadn.net.cn

最大优先级

队列中消息的最大优先级 (0-255)。spring-doc.cadn.net.cn

违约:nonespring-doc.cadn.net.cn

missingQueues致命

当找不到队列时,是否将该条件视为 fatal 并停止监听器容器。 默认为false以便容器不断尝试从队列中使用 — 例如,当使用集群并且托管非 HA 队列的节点关闭时。spring-doc.cadn.net.cn

违约:falsespring-doc.cadn.net.cn

overflowBehavior

在以下情况下要执行的作maxLengthmaxLengthBytes超出;现在drop-headreject-publish但请参阅 RabbitMQ 文档。spring-doc.cadn.net.cn

违约:nonespring-doc.cadn.net.cn

预取

预取计数。spring-doc.cadn.net.cn

前缀

要添加到destination和队列。spring-doc.cadn.net.cn

默认值: “”。spring-doc.cadn.net.cn

queueBindingArguments

将队列绑定到 exchange 时应用的参数;用于headers exchangeType指定要匹配的标头。 例如…​queueBindingArguments.x-match=any,…​queueBindingArguments.someHeader=someValue.spring-doc.cadn.net.cn

默认值:空spring-doc.cadn.net.cn

queueDeclarationRetries

如果队列缺失,则重试从队列中消费的次数。 仅当missingQueuesFataltrue. 否则,容器将无限期地重试。 不支持containerTypedirect.spring-doc.cadn.net.cn

queueNameGroupOnly

如果为 true,则从名称等于group. 否则,队列名称为destination.group. 例如,当使用 Spring Cloud Stream 从现有 RabbitMQ 队列中使用时,这非常有用。spring-doc.cadn.net.cn

默认值:false。spring-doc.cadn.net.cn

quorum.deliveryLimit

什么时候quorum.enabled=true,设置一个传递限制,超过该限制后,消息将被丢弃或死信。spring-doc.cadn.net.cn

Default: none - 将应用代理默认值。spring-doc.cadn.net.cn

法定人数已启用

如果为 true,则创建 quorum 队列而不是 Classic 队列。spring-doc.cadn.net.cn

默认值:falsespring-doc.cadn.net.cn

quorum.initialQuorumSize

什么时候quorum.enabled=true中,设置初始仲裁大小。spring-doc.cadn.net.cn

Default: none - 将应用代理默认值。spring-doc.cadn.net.cn

recoveryInterval 的

连接恢复尝试之间的间隔(以毫秒为单位)。spring-doc.cadn.net.cn

违约:5000.spring-doc.cadn.net.cn

requeueRejected 的

禁用重试时是否应将投放失败重新排队,或者republishToDlqfalse.spring-doc.cadn.net.cn

违约:false.spring-doc.cadn.net.cn

republishDeliveryMode

什么时候republishToDlqtrue指定重新发布的消息的传递模式。spring-doc.cadn.net.cn

违约:DeliveryMode.PERSISTENTspring-doc.cadn.net.cn

republishToDlq

默认情况下,在重试用尽后失败的消息将被拒绝。 如果配置了死信队列 (DLQ),则 RabbitMQ 会将失败的消息(未更改)路由到 DLQ。 如果设置为true,Binder 会将失败的消息重新发布到 DLQ,其中包含其他标头,包括异常消息和来自最终失败原因的堆栈跟踪。 另请参阅 frameMaxHeadroom 属性spring-doc.cadn.net.cn

默认值:falsespring-doc.cadn.net.cn

singleActiveConsumer

设置为 true 可将x-single-active-consumerqueue 属性设置为 true。spring-doc.cadn.net.cn

违约:falsespring-doc.cadn.net.cn

交易

是否使用事务处理通道。spring-doc.cadn.net.cn

违约:false.spring-doc.cadn.net.cn

ttl

声明时应用于队列的默认生存时间(以毫秒为单位)。spring-doc.cadn.net.cn

违约:no limitspring-doc.cadn.net.cn

txSize (tx大小)

ack 之间的投放数。 不支持containerTypedirect.spring-doc.cadn.net.cn

3.3. 高级侦听器容器配置

要设置未作为 Binder 或绑定属性公开的侦听器容器属性,请添加ListenerContainerCustomizer添加到应用程序上下文中。 将设置 Binder 和 binding 属性,然后调用定制器。 定制器 (configure()method) 提供队列名称和 Consumer Group 作为参数。spring-doc.cadn.net.cn

3.4. 高级队列/交换/绑定配置

RabbitMQ 团队会不时添加新功能,这些功能通过在声明队列等时设置一些参数来启用。 通常,此类功能是通过添加适当的属性在 Binder 中启用的,但这在当前版本中可能无法立即使用。 从版本 3.0.1 开始,您现在可以添加DeclarableCustomizerbean 添加到应用程序上下文中以修改Declarable (Queue,ExchangeBinding) 执行声明。 这允许您添加 Binder 当前不直接支持的参数。spring-doc.cadn.net.cn

3.5. 接收批量消息

通常,如果 producer 绑定具有batch-enabled=true(请参阅 Rabbit Producer 属性),或者消息由BatchingRabbitTemplate,则批处理的元素将作为对 listener 方法的单独调用返回。 从版本 3.0 开始,任何此类批处理都可以显示为List<?>如果spring.cloud.stream.bindings.<name>.consumer.batch-mode设置为true.spring-doc.cadn.net.cn

3.6. Rabbit Producer 属性

以下属性仅适用于 Rabbit 生产者,并且必须以spring.cloud.stream.rabbit.bindings.<channelName>.producer..spring-doc.cadn.net.cn

但是,如果需要将同一组属性应用于大多数绑定,则 避免重复,Spring Cloud Stream 支持为所有通道设置值, 格式为spring.cloud.stream.rabbit.default.<property>=<value>.spring-doc.cadn.net.cn

此外,请记住,绑定特定属性将覆盖其在默认值中的等效属性。spring-doc.cadn.net.cn

autoBindDlq

是否自动声明 DLQ 并将其绑定到 Binder DLX。spring-doc.cadn.net.cn

违约:false.spring-doc.cadn.net.cn

batching已启用

是否启用生产者消息批处理。 根据以下属性(在此列表中的以下三个条目中描述),消息将批处理为一条消息:'batchSize'、batchBufferLimitbatchTimeout. 有关更多信息,请参阅 批处理 。 另请参阅接收 Batched Messagesspring-doc.cadn.net.cn

违约:false.spring-doc.cadn.net.cn

批量大小

启用批处理时要缓冲的消息数。spring-doc.cadn.net.cn

违约:100.spring-doc.cadn.net.cn

batchBufferLimit

启用批处理时的最大缓冲区大小。spring-doc.cadn.net.cn

违约:10000.spring-doc.cadn.net.cn

batch超时

启用批处理时的批处理超时。spring-doc.cadn.net.cn

违约:5000.spring-doc.cadn.net.cn

bindingRoutingKey

用于将队列绑定到 exchange 的路由密钥(如果bindQueuetrue). 可以是多个键 - 请参阅bindingRoutingKeyDelimiter. 对于分区目标,-n附加到每个键。 仅适用于requiredGroups,然后仅提供给这些组。spring-doc.cadn.net.cn

违约:。#spring-doc.cadn.net.cn

bindingRoutingKeyDelimiter

当 this 不为 null 时,'bindingRoutingKey' 被视为由此值分隔的键列表;通常使用逗号。 仅适用于requiredGroups,然后仅提供给这些组。spring-doc.cadn.net.cn

违约:null.spring-doc.cadn.net.cn

bindQueue (绑定队列)

是否声明队列并将其绑定到目标 Exchange。 将其设置为false如果您已设置自己的基础设施,并且之前已创建并绑定队列。 仅适用于requiredGroups,然后仅提供给这些组。spring-doc.cadn.net.cn

违约:true.spring-doc.cadn.net.cn

压缩

发送时是否应压缩数据。spring-doc.cadn.net.cn

违约:false.spring-doc.cadn.net.cn

confirmAckChannel 确认

什么时候errorChannelEnabled为 true,则是向其发送肯定投放确认的渠道(也称为 Publisher confirms)。 如果通道不存在,则DirectChannel将使用此名称注册。 必须将连接工厂配置为启用发布者确认。spring-doc.cadn.net.cn

违约:nullChannel(丢弃 ACK)。spring-doc.cadn.net.cn

deadLetterQueueName (死信队列名称)

DLQ 的名称 仅适用于requiredGroups,然后仅提供给这些组。spring-doc.cadn.net.cn

违约:prefix+destination.dlqspring-doc.cadn.net.cn

死信交换

要分配给队列的 DLX。 仅当autoBindDlqtrue. 仅在以下情况下适用requiredGroups,然后仅提供给这些组。spring-doc.cadn.net.cn

默认值: 'prefix+DLX'spring-doc.cadn.net.cn

deadLetterExchange类型

要分配给队列的 DLX 的类型。 仅在以下情况下相关autoBindDlqtrue. 仅在以下情况下适用requiredGroups,然后仅提供给这些组。spring-doc.cadn.net.cn

默认值: 'direct'spring-doc.cadn.net.cn

deadLetterRoutingKey

要分配给队列的死信路由键。 仅当autoBindDlqtrue. 仅在以下情况下适用requiredGroups,然后仅提供给这些组。spring-doc.cadn.net.cn

违约:destinationspring-doc.cadn.net.cn

声明 Dlx

是否声明目标的死信交换。 仅在以下情况下相关autoBindDlqtrue. 设置为false如果您有预配置的 DLX. 仅在以下情况下适用requiredGroups,然后仅提供给这些组。spring-doc.cadn.net.cn

违约:true.spring-doc.cadn.net.cn

declareExchange

是否声明目标的交换。spring-doc.cadn.net.cn

违约:true.spring-doc.cadn.net.cn

delay表达式

一个 SPEL 表达式,用于评估要应用于消息的延迟 (x-delay标头)。 如果交换不是延迟消息交换,则它不起作用。spring-doc.cadn.net.cn

默认值:否x-delayheader 的spring-doc.cadn.net.cn

delayedExchange (延迟交换)

是否将交易所声明为Delayed Message Exchange. 需要 broker 上的 delayed message exchange 插件。 这x-delayed-type参数设置为exchangeType.spring-doc.cadn.net.cn

违约:false.spring-doc.cadn.net.cn

deliveryMode 交付模式

交货模式。spring-doc.cadn.net.cn

违约:PERSISTENT.spring-doc.cadn.net.cn

dlqBindingArguments

将 dlq 绑定到死信交换时应用的参数;用于headers deadLetterExchangeType指定要匹配的标头。 例如…​dlqBindingArguments.x-match=any,…​dlqBindingArguments.someHeader=someValue. 仅在以下情况下适用requiredGroups,然后仅提供给这些组。spring-doc.cadn.net.cn

默认值:空spring-doc.cadn.net.cn

dlqDeadLetterExchange

声明 DLQ 时,要分配给该队列的 DLX。 仅在以下情况下适用requiredGroups,然后仅提供给这些组。spring-doc.cadn.net.cn

违约:nonespring-doc.cadn.net.cn

dlqDeadLetterRoutingKey

声明 DLQ 时,要分配给该队列的死信路由键。 仅在以下情况下适用requiredGroups,然后仅提供给这些组。spring-doc.cadn.net.cn

违约:nonespring-doc.cadn.net.cn

dlq过期

删除未使用的死信队列之前的时间(以毫秒为单位)。 仅在以下情况下适用requiredGroups,然后仅提供给这些组。spring-doc.cadn.net.cn

违约:no expirationspring-doc.cadn.net.cn

dlq懒惰

使用x-queue-mode=lazy论点。 请参见“延迟队列”。 请考虑使用策略而不是此设置,因为使用策略允许在不删除队列的情况下更改设置。 仅在以下情况下适用requiredGroups,然后仅提供给这些组。spring-doc.cadn.net.cn

dlqMaxLength

死信队列中的最大消息数。 仅在以下情况下适用requiredGroups,然后仅提供给这些组。spring-doc.cadn.net.cn

违约:no limitspring-doc.cadn.net.cn

dlqMaxLengthBytes

来自所有消息的死信队列中的最大总字节数。 仅在以下情况下适用requiredGroups,然后仅提供给这些组。spring-doc.cadn.net.cn

违约:no limitspring-doc.cadn.net.cn

dlqMaxPriority

死信队列中消息的最大优先级 (0-255) 仅在以下情况下适用requiredGroups,然后仅提供给这些组。spring-doc.cadn.net.cn

违约:nonespring-doc.cadn.net.cn

dlqQuorum.deliveryLimit

什么时候quorum.enabled=true,设置一个传递限制,超过该限制后,消息将被丢弃或死信。 仅在以下情况下适用requiredGroups,然后仅提供给这些组。spring-doc.cadn.net.cn

Default: none - 将应用代理默认值。spring-doc.cadn.net.cn

dlqQuorum.enabled 已启用

如果为 true,则创建 quorum 死信队列,而不是 Classic 队列。 仅在以下情况下适用requiredGroups,然后仅提供给这些组。spring-doc.cadn.net.cn

默认值:falsespring-doc.cadn.net.cn

dlqQuorum.initialQuorumSize

什么时候quorum.enabled=true中,设置初始仲裁大小。 仅在以下情况下适用requiredGroups,然后仅提供给这些组。spring-doc.cadn.net.cn

Default: none - 将应用代理默认值。spring-doc.cadn.net.cn

dlqSingleActiveConsumer

设置为 true 可将x-single-active-consumerqueue 属性设置为 true。 仅在以下情况下适用requiredGroups,然后仅提供给这些组。spring-doc.cadn.net.cn

违约:falsespring-doc.cadn.net.cn

dlqTtl

声明时应用于死信队列的默认生存时间(以毫秒为单位)。 仅在以下情况下适用requiredGroups,然后仅提供给这些组。spring-doc.cadn.net.cn

违约:no limitspring-doc.cadn.net.cn

exchange自动删除

如果declareExchangetrue,是否应自动删除交换(在删除最后一个队列后将其删除)。spring-doc.cadn.net.cn

违约:true.spring-doc.cadn.net.cn

交换耐用

如果declareExchangetrue,则 Exchange 是否应为 Durable (在代理重启后继续存在)。spring-doc.cadn.net.cn

违约:true.spring-doc.cadn.net.cn

exchangeType

交易所类型:direct,fanout,headerstopic对于非分区目标和direct,headerstopic对于分区目标。spring-doc.cadn.net.cn

违约:topic.spring-doc.cadn.net.cn

到期

删除未使用的队列之前的时间 (以毫秒为单位)。 仅在以下情况下适用requiredGroups,然后仅提供给这些组。spring-doc.cadn.net.cn

违约:no expirationspring-doc.cadn.net.cn

headerPatterns 的

要映射到出站消息的标头的模式。spring-doc.cadn.net.cn

默认值:(所有标头)。['*']spring-doc.cadn.net.cn

懒惰

使用x-queue-mode=lazy论点。 请参见“延迟队列”。 请考虑使用策略而不是此设置,因为使用策略允许在不删除队列的情况下更改设置。 仅在以下情况下适用requiredGroups,然后仅提供给这些组。spring-doc.cadn.net.cn

违约:false.spring-doc.cadn.net.cn

maxLength

队列中的最大消息数。 仅在以下情况下适用requiredGroups,然后仅提供给这些组。spring-doc.cadn.net.cn

违约:no limitspring-doc.cadn.net.cn

maxLengthBytes

队列中所有消息的最大总字节数。 仅适用于requiredGroups,然后仅提供给这些组。spring-doc.cadn.net.cn

违约:no limitspring-doc.cadn.net.cn

最大优先级

队列中消息的最大优先级 (0-255)。 仅适用于requiredGroups,然后仅提供给这些组。spring-doc.cadn.net.cn

违约:nonespring-doc.cadn.net.cn

前缀

要添加到destination交换。spring-doc.cadn.net.cn

默认值: “”。spring-doc.cadn.net.cn

queueBindingArguments

将队列绑定到 exchange 时应用的参数;用于headers exchangeType指定要匹配的标头。 例如…​queueBindingArguments.x-match=any,…​queueBindingArguments.someHeader=someValue. 仅在以下情况下适用requiredGroups,然后仅提供给这些组。spring-doc.cadn.net.cn

默认值:空spring-doc.cadn.net.cn

queueNameGroupOnly

什么时候true,从名称等于group. 否则,队列名称为destination.group. 例如,当使用 Spring Cloud Stream 从现有 RabbitMQ 队列中使用时,这非常有用。 仅在以下情况下适用requiredGroups,然后仅提供给这些组。spring-doc.cadn.net.cn

默认值:false。spring-doc.cadn.net.cn

quorum.deliveryLimit

什么时候quorum.enabled=true,设置一个传递限制,超过该限制后,消息将被丢弃或死信。 仅在以下情况下适用requiredGroups,然后仅提供给这些组。spring-doc.cadn.net.cn

Default: none - 将应用代理默认值。spring-doc.cadn.net.cn

法定人数已启用

如果为 true,则创建 quorum 队列而不是 Classic 队列。 仅在以下情况下适用requiredGroups,然后仅提供给这些组。spring-doc.cadn.net.cn

默认值:falsespring-doc.cadn.net.cn

quorum.initialQuorumSize

什么时候quorum.enabled=true中,设置初始仲裁大小。 仅在以下情况下适用requiredGroups,然后仅提供给这些组。spring-doc.cadn.net.cn

Default: none - 将应用代理默认值。spring-doc.cadn.net.cn

routingKeyExpression 的

一个 SPEL 表达式,用于确定发布消息时要使用的路由键。 对于固定路由密钥,请使用文本表达式,例如routingKeyExpression='my.routingKey'在属性文件中或routingKeyExpression: '''my.routingKey'''在 YAML 文件中。spring-doc.cadn.net.cn

违约:destinationdestination-<partition>对于分区目标。spring-doc.cadn.net.cn

singleActiveConsumer

设置为 true 可将x-single-active-consumerqueue 属性设置为 true。 仅在以下情况下适用requiredGroups,然后仅提供给这些组。spring-doc.cadn.net.cn

违约:falsespring-doc.cadn.net.cn

交易

是否使用事务处理通道。spring-doc.cadn.net.cn

违约:false.spring-doc.cadn.net.cn

ttl

声明时应用于队列的默认生存时间(以毫秒为单位)。 仅在以下情况下适用requiredGroups,然后仅提供给这些组。spring-doc.cadn.net.cn

违约:no limitspring-doc.cadn.net.cn

对于 RabbitMQ,内容类型标头可以由外部应用程序设置。 Spring Cloud Stream 支持将它们作为扩展内部协议的一部分,用于任何类型的传输,包括传输,例如 Kafka(0.11 之前),它们本身不支持 headers。

4. 使用现有队列/交易所

默认情况下,Binder 将自动配置主题交换,其名称派生自 destination binding 属性的值<prefix><destination>. 如果未提供,则 destination 默认为绑定名称。 绑定 Consumer 时,队列会自动预置名称<prefix><destination>.<group>(如果groupbinding 属性)或匿名自动删除队列(如果没有group. 队列将使用 “match-all” 通配符路由键 () 绑定到交换,以实现非分区绑定,或者#<destination>-<instanceIndex>对于分区绑定。 前缀为空String默认情况下。 如果使用requiredGroups,将为每个组配置一个队列/绑定。spring-doc.cadn.net.cn

有许多特定于 rabbit 的绑定属性允许您修改此默认行为。spring-doc.cadn.net.cn

如果您有要使用的现有 Exchange/队列,则可以按如下方式完全禁用自动供应,假设该 Exchange 名为myExchange队列名为myQueue:spring-doc.cadn.net.cn

如果您希望 Binders 预置 queue/exchange,但您希望使用此处讨论的默认值以外的其他功能来执行此作,请使用以下属性。 有关更多信息,请参阅上面的属性文档。spring-doc.cadn.net.cn

  • spring.cloud.stream.rabbit.bindings.<binding name>.consumer.bindingRoutingKey=myRoutingKeyspring-doc.cadn.net.cn

  • spring.cloud.stream.rabbit.bindings.<binding name>.consumer.exchangeType=<type>spring-doc.cadn.net.cn

  • spring.cloud.stream.rabbit.bindings.<binding name>.producer.routingKeyExpression='myRoutingKey'spring-doc.cadn.net.cn

在声明死信交换/队列时,也有类似的属性,当autoBindDlqtrue.spring-doc.cadn.net.cn

5. 使用 RabbitMQ Binder 重试

在 Binder 中启用重试后,侦听器容器线程将在配置的任何回退期间暂停。 当单个使用者需要严格排序时,这可能很重要。但是,对于其他使用案例,它会阻止在该线程上处理其他消息。 使用 Binder 重试的另一种方法是在死信队列 (DLQ) 上设置具有生存时间的死信,并在 DLQ 本身上设置死信配置。 有关此处讨论的属性的更多信息,请参阅“RabbitMQ Binder 属性”。 您可以使用以下示例配置来启用此功能:spring-doc.cadn.net.cn

  • 设置autoBindDlqtrue. Binder 将创建一个 DLQ。 (可选)您可以在deadLetterQueueName.spring-doc.cadn.net.cn

  • 设置dlqTtl设置为您希望在重新投递之间等待的 back off 时间。spring-doc.cadn.net.cn

  • dlqDeadLetterExchange添加到默认交易所。 来自 DLQ 的过期邮件将路由到原始队列,因为默认的deadLetterRoutingKey是队列名称 (destination.group). 设置为默认 exchange 是通过设置无值的属性来实现的,如下一个示例所示。spring-doc.cadn.net.cn

要强制消息为死信,请抛出AmqpRejectAndDontRequeueException或设置requeueRejectedtrue(默认值)并引发任何异常。spring-doc.cadn.net.cn

循环无休止地继续,这对于暂时性问题来说很好,但你可能想在尝试几次后放弃。 幸运的是,RabbitMQ 提供了x-death标头,用于确定已发生的周期数。spring-doc.cadn.net.cn

要在放弃后确认消息,请抛出一个ImmediateAcknowledgeAmqpException.spring-doc.cadn.net.cn

5.1. 把它们放在一起

以下配置将创建一个 ExchangemyDestination带队列myDestination.consumerGroup绑定到具有通配符路由密钥的主题交换:#spring-doc.cadn.net.cn

---
spring.cloud.stream.bindings.input.destination=myDestination
spring.cloud.stream.bindings.input.group=consumerGroup
#disable binder retries
spring.cloud.stream.bindings.input.consumer.max-attempts=1
#dlx/dlq setup
spring.cloud.stream.rabbit.bindings.input.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-ttl=5000
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-dead-letter-exchange=
---

此配置将创建一个绑定到直接交换 (DLX),其中路由密钥为myDestination.consumerGroup. 当邮件被拒绝时,它们将被路由到 DLQ。 5 秒后,消息过期,并使用队列名称作为路由键路由到原始队列,如以下示例所示:spring-doc.cadn.net.cn

Spring Boot 应用程序
@SpringBootApplication
@EnableBinding(Sink.class)
public class XDeathApplication {

    public static void main(String[] args) {
        SpringApplication.run(XDeathApplication.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void listen(String in, @Header(name = "x-death", required = false) Map<?,?> death) {
        if (death != null && death.get("count").equals(3L)) {
            // giving up - don't send to DLX
            throw new ImmediateAcknowledgeAmqpException("Failed after 4 attempts");
        }
        throw new AmqpRejectAndDontRequeueException("failed");
    }

}

请注意,x-deathheader 是一个Long.spring-doc.cadn.net.cn

6. 错误通道

从版本 1.3 开始,Binder 无条件地将异常发送到每个 Consumer 目标的错误通道,并且还可以配置为将异步 producer send failures发送到 error 通道。 有关更多信息,请参见“[spring-cloud-stream-overview-error-handling]”。spring-doc.cadn.net.cn

RabbitMQ 有两种类型的发送失败:spring-doc.cadn.net.cn

后者很少见。 根据 RabbitMQ 文档,“[A nack] 只有在负责队列的 Erlang 进程中发生内部错误时才会交付。spring-doc.cadn.net.cn

除了启用生产者错误通道(如“[spring-cloud-stream-overview-error-handling]”中所述)之外,RabbitMQ Binder 仅在连接工厂配置适当的情况下才会向通道发送消息,如下所示:spring-doc.cadn.net.cn

将 Spring Boot 配置用于连接工厂时,请设置以下属性:spring-doc.cadn.net.cn

ErrorMessage对于返回的消息,是一个ReturnedAmqpMessageException具有以下属性:spring-doc.cadn.net.cn

对于否定确认的确认,有效负载是NackedAmqpMessageException具有以下属性:spring-doc.cadn.net.cn

不会自动处理这些异常(例如发送到死信队列)。 您可以在自己的 Spring 集成流中使用这些异常。spring-doc.cadn.net.cn

7. 死信队列处理

由于您无法预测用户希望如何处理死信消息,因此框架不提供任何标准机制来处理它们。 如果死信的原因是暂时的,您可能希望将消息路由回原始队列。 但是,如果问题是永久性问题,则可能会导致无限循环。 Spring 下面的 Boot 应用程序展示了如何将这些消息路由回原始队列,但在三次尝试后将它们移动到第三个“停车场”队列的示例。 第二个示例使用 RabbitMQ Delayed Message Exchange 为重新排队的消息引入延迟。 在此示例中,每次尝试的延迟都会增加。 这些示例使用@RabbitListener以接收来自 DLQ 的消息。 您还可以使用RabbitTemplate.receive()在批处理中。spring-doc.cadn.net.cn

这些示例假定原始目标是so8400in且 Consumer 组为so8400.spring-doc.cadn.net.cn

7.1. 未分区的目标

前两个示例适用于目标未分区的情况:spring-doc.cadn.net.cn

@SpringBootApplication
public class ReRouteDlqApplication {

    private static final String ORIGINAL_QUEUE = "so8400in.so8400";

    private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

    private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

    private static final String X_RETRIES_HEADER = "x-retries";

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
        System.out.println("Hit enter to terminate");
        System.in.read();
        context.close();
    }

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = DLQ)
    public void rePublish(Message failedMessage) {
        Integer retriesHeader = (Integer) failedMessage.getMessageProperties().getHeaders().get(X_RETRIES_HEADER);
        if (retriesHeader == null) {
            retriesHeader = Integer.valueOf(0);
        }
        if (retriesHeader < 3) {
            failedMessage.getMessageProperties().getHeaders().put(X_RETRIES_HEADER, retriesHeader + 1);
            this.rabbitTemplate.send(ORIGINAL_QUEUE, failedMessage);
        }
        else {
            this.rabbitTemplate.send(PARKING_LOT, failedMessage);
        }
    }

    @Bean
    public Queue parkingLot() {
        return new Queue(PARKING_LOT);
    }

}
@SpringBootApplication
public class ReRouteDlqApplication {

    private static final String ORIGINAL_QUEUE = "so8400in.so8400";

    private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

    private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

    private static final String X_RETRIES_HEADER = "x-retries";

    private static final String DELAY_EXCHANGE = "dlqReRouter";

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
        System.out.println("Hit enter to terminate");
        System.in.read();
        context.close();
    }

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = DLQ)
    public void rePublish(Message failedMessage) {
        Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
        Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
        if (retriesHeader == null) {
            retriesHeader = Integer.valueOf(0);
        }
        if (retriesHeader < 3) {
            headers.put(X_RETRIES_HEADER, retriesHeader + 1);
            headers.put("x-delay", 5000 * retriesHeader);
            this.rabbitTemplate.send(DELAY_EXCHANGE, ORIGINAL_QUEUE, failedMessage);
        }
        else {
            this.rabbitTemplate.send(PARKING_LOT, failedMessage);
        }
    }

    @Bean
    public DirectExchange delayExchange() {
        DirectExchange exchange = new DirectExchange(DELAY_EXCHANGE);
        exchange.setDelayed(true);
        return exchange;
    }

    @Bean
    public Binding bindOriginalToDelay() {
        return BindingBuilder.bind(new Queue(ORIGINAL_QUEUE)).to(delayExchange()).with(ORIGINAL_QUEUE);
    }

    @Bean
    public Queue parkingLot() {
        return new Queue(PARKING_LOT);
    }

}

7.2. 分区目的地

对于分区目标,所有分区都有一个 DLQ。我们从 Headers 中确定原始队列。spring-doc.cadn.net.cn

7.2.1.republishToDlq=false

什么时候republishToDlqfalse时,RabbitMQ 会将消息发布到 DLX/DLQ,其中包含x-death标头,其中包含有关原始目标的信息,如以下示例所示:spring-doc.cadn.net.cn

@SpringBootApplication
public class ReRouteDlqApplication {

	private static final String ORIGINAL_QUEUE = "so8400in.so8400";

	private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

	private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

	private static final String X_DEATH_HEADER = "x-death";

	private static final String X_RETRIES_HEADER = "x-retries";

	public static void main(String[] args) throws Exception {
		ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
		System.out.println("Hit enter to terminate");
		System.in.read();
		context.close();
	}

	@Autowired
	private RabbitTemplate rabbitTemplate;

	@SuppressWarnings("unchecked")
	@RabbitListener(queues = DLQ)
	public void rePublish(Message failedMessage) {
		Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
		Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
		if (retriesHeader == null) {
			retriesHeader = Integer.valueOf(0);
		}
		if (retriesHeader < 3) {
			headers.put(X_RETRIES_HEADER, retriesHeader + 1);
			List<Map<String, ?>> xDeath = (List<Map<String, ?>>) headers.get(X_DEATH_HEADER);
			String exchange = (String) xDeath.get(0).get("exchange");
			List<String> routingKeys = (List<String>) xDeath.get(0).get("routing-keys");
			this.rabbitTemplate.send(exchange, routingKeys.get(0), failedMessage);
		}
		else {
			this.rabbitTemplate.send(PARKING_LOT, failedMessage);
		}
	}

	@Bean
	public Queue parkingLot() {
		return new Queue(PARKING_LOT);
	}

}

7.2.2.republishToDlq=true

什么时候republishToDlqtrue中,重新发布 Recoverer 会将原始 Exchange 和 Routing 键添加到 Headers,如以下示例所示:spring-doc.cadn.net.cn

@SpringBootApplication
public class ReRouteDlqApplication {

	private static final String ORIGINAL_QUEUE = "so8400in.so8400";

	private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

	private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

	private static final String X_RETRIES_HEADER = "x-retries";

	private static final String X_ORIGINAL_EXCHANGE_HEADER = RepublishMessageRecoverer.X_ORIGINAL_EXCHANGE;

	private static final String X_ORIGINAL_ROUTING_KEY_HEADER = RepublishMessageRecoverer.X_ORIGINAL_ROUTING_KEY;

	public static void main(String[] args) throws Exception {
		ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
		System.out.println("Hit enter to terminate");
		System.in.read();
		context.close();
	}

	@Autowired
	private RabbitTemplate rabbitTemplate;

	@RabbitListener(queues = DLQ)
	public void rePublish(Message failedMessage) {
		Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
		Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
		if (retriesHeader == null) {
			retriesHeader = Integer.valueOf(0);
		}
		if (retriesHeader < 3) {
			headers.put(X_RETRIES_HEADER, retriesHeader + 1);
			String exchange = (String) headers.get(X_ORIGINAL_EXCHANGE_HEADER);
			String originalRoutingKey = (String) headers.get(X_ORIGINAL_ROUTING_KEY_HEADER);
			this.rabbitTemplate.send(exchange, originalRoutingKey, failedMessage);
		}
		else {
			this.rabbitTemplate.send(PARKING_LOT, failedMessage);
		}
	}

	@Bean
	public Queue parkingLot() {
		return new Queue(PARKING_LOT);
	}

}

8. 使用 RabbitMQ Binder 进行分区

RabbitMQ 本身不支持分区。spring-doc.cadn.net.cn

有时,将数据发送到特定分区是有利的 — 例如,当您想要严格排序消息处理时,特定客户的所有消息都应发送到同一分区。spring-doc.cadn.net.cn

RabbitMessageChannelBinder通过将每个分区的队列绑定到目标 Exchange 来提供分区。spring-doc.cadn.net.cn

以下 Java 和 YAML 示例展示了如何配置 producer:spring-doc.cadn.net.cn

制作人
@SpringBootApplication
@EnableBinding(Source.class)
public class RabbitPartitionProducerApplication {

    private static final Random RANDOM = new Random(System.currentTimeMillis());

    private static final String[] data = new String[] {
            "abc1", "def1", "qux1",
            "abc2", "def2", "qux2",
            "abc3", "def3", "qux3",
            "abc4", "def4", "qux4",
            };

    public static void main(String[] args) {
        new SpringApplicationBuilder(RabbitPartitionProducerApplication.class)
            .web(false)
            .run(args);
    }

    @InboundChannelAdapter(channel = Source.OUTPUT, poller = @Poller(fixedRate = "5000"))
    public Message<?> generate() {
        String value = data[RANDOM.nextInt(data.length)];
        System.out.println("Sending: " + value);
        return MessageBuilder.withPayload(value)
                .setHeader("partitionKey", value)
                .build();
    }

}
application.yml
    spring:
      cloud:
        stream:
          bindings:
            output:
              destination: partitioned.destination
              producer:
                partitioned: true
                partition-key-expression: headers['partitionKey']
                partition-count: 2
                required-groups:
                - myGroup

前面示例中的配置使用默认分区 (key.hashCode() % partitionCount). 这可能会也可能不会提供适当平衡的算法,具体取决于键值。 您可以使用partitionSelectorExpressionpartitionSelectorClass性能。spring-doc.cadn.net.cn

required-groups仅当您需要在部署创建者时预置使用者队列时,才需要属性。 否则,发送到分区的任何消息都将丢失,直到部署相应的使用者为止。spring-doc.cadn.net.cn

以下配置预置主题交换:spring-doc.cadn.net.cn

零件更换

以下队列绑定到该 Exchange:spring-doc.cadn.net.cn

Part 队列

以下绑定将队列关联到交换:spring-doc.cadn.net.cn

零件绑定

以下 Java 和 YAML 示例延续了前面的示例,并展示了如何配置使用者:spring-doc.cadn.net.cn

消费者
@SpringBootApplication
@EnableBinding(Sink.class)
public class RabbitPartitionConsumerApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(RabbitPartitionConsumerApplication.class)
            .web(false)
            .run(args);
    }

    @StreamListener(Sink.INPUT)
    public void listen(@Payload String in, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
        System.out.println(in + " received from queue " + queue);
    }

}
application.yml
    spring:
      cloud:
        stream:
          bindings:
            input:
              destination: partitioned.destination
              group: myGroup
              consumer:
                partitioned: true
                instance-index: 0
RabbitMessageChannelBinder不支持动态扩展。 每个分区必须至少有一个使用者。 消费者的instanceIndex用于指示消耗的分区。 Cloud Foundry 等平台只能有一个实例,其中instanceIndex.