此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring AMQP 3.2.0spring-doc.cn

侦听器并发

SimpleMessageListenerContainer (简单消息监听器容器)

默认情况下,侦听器容器启动一个从队列接收消息的使用者。spring-doc.cn

在检查上一节中的表时,您可以看到许多控制并发的属性和特性。 最简单的是 ,它创建并发处理消息的 (固定) 数量的使用者。concurrentConsumersspring-doc.cn

在版本 1.3.0 之前,这是唯一可用的设置,必须停止并重新启动容器才能更改设置。spring-doc.cn

从 1.3.0 版本开始,您现在可以动态调整属性。 如果在容器运行时更改了此设置,则会根据需要添加或删除使用者,以适应新设置。concurrentConsumersspring-doc.cn

此外,还添加了一个名为 的新属性,容器会根据工作负载动态调整并发。 这与四个附加属性结合使用: 、 、 和 。 使用默认设置时,增加使用者的算法工作如下:maxConcurrentConsumersconsecutiveActiveTriggerstartConsumerMinIntervalconsecutiveIdleTriggerstopConsumerMinIntervalspring-doc.cn

如果尚未达到 ,并且现有使用者连续 10 个周期处于活动状态,并且自最后一个使用者启动以来已经过了至少 10 秒,则会启动新的使用者。 如果使用者在 * 毫秒内收到至少一条消息,则认为该使用者处于活动状态。maxConcurrentConsumersbatchSizereceiveTimeoutspring-doc.cn

使用默认设置时,减少使用者的算法工作如下:spring-doc.cn

如果 Running 的使用者数超过 AND,并且使用者检测到连续 10 次超时 (空闲),并且最后一个使用者至少在 60 秒前停止,则使用者将停止。 超时取决于 和 属性。 如果使用者在 * 毫秒内未收到任何消息,则认为该使用者处于空闲状态。 因此,使用默认超时 (1 秒) 和 4 时,在 40 秒的空闲时间后考虑停止使用者 (4 次超时对应于 1 次空闲检测)。concurrentConsumersreceiveTimeoutbatchSizebatchSizereceiveTimeoutbatchSizespring-doc.cn

实际上,只有当整个容器空闲一段时间时,才能停止消费者。 这是因为 broker 在所有活动使用者之间共享其工作。

每个使用者都使用一个通道,而不管配置的队列数量如何。spring-doc.cn

从版本 2.0 开始,可以使用属性设置 and 属性 — 例如,.concurrentConsumersmaxConcurrentConsumersconcurrency2-4spring-doc.cn

DirectMessageListenerContainer

使用此容器,并发性基于配置的队列和 。 每个队列的每个使用者都使用一个单独的通道,并发性由 rabbit 客户端库控制。 默认情况下,在撰写本文时,它使用线程池。consumersPerQueueDEFAULT_NUM_THREADS = Runtime.getRuntime().availableProcessors() * 2spring-doc.cn

您可以配置 a 以提供所需的最大并发性。taskExecutorspring-doc.cn