此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Boot 3.3.1Spring中文文档

此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Boot 3.3.1Spring中文文档

高级消息队列协议 (AMQP) 是一种平台中立的线级协议,适用于面向消息的中间件。 Spring AMQP 项目将 Spring 的核心概念应用于基于 AMQP 的消息传递解决方案的开发。 Spring Boot 为通过 RabbitMQ 使用 AMQP 提供了多种便利,包括“入门”。spring-boot-starter-amqpSpring中文文档

RabbitMQ 支持

RabbitMQ 是基于 AMQP 协议的轻量级、可靠、可扩展、可移植的消息代理。 Spring 使用 RabbitMQ 通过 AMQP 协议进行通信。Spring中文文档

RabbitMQ 配置由 中的外部配置属性控制。 例如,您可以在以下位置声明以下部分:spring.rabbitmq.*application.propertiesSpring中文文档

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=secret
spring:
  rabbitmq:
    host: "localhost"
    port: 5672
    username: "admin"
    password: "secret"

或者,您可以使用以下属性配置相同的连接:addressesSpring中文文档

spring.rabbitmq.addresses=amqp://admin:secret@localhost
spring:
  rabbitmq:
    addresses: "amqp://admin:secret@localhost"
以这种方式指定地址时,将忽略 和 属性。 如果地址使用该协议,则会自动启用 SSL 支持。hostportamqps

有关更多受支持的基于属性的配置选项,请参阅 RabbitProperties。 要配置 Spring AMQP 使用的 RabbitMQ 的较低级别详细信息,请定义一个 bean。ConnectionFactoryConnectionFactoryCustomizerSpring中文文档

如果上下文中存在 Bean,则将自动使用它来命名由自动配置的 .ConnectionNameStrategyCachingConnectionFactorySpring中文文档

要对 进行应用程序范围的附加定制,请使用 bean。RabbitTemplateRabbitTemplateCustomizerSpring中文文档

以这种方式指定地址时,将忽略 和 属性。 如果地址使用该协议,则会自动启用 SSL 支持。hostportamqps
有关更多详细信息,请参阅了解 RabbitMQ 使用的协议 AMQP

发送消息

Spring 和 是自动配置的,您可以将它们直接自动连接到您自己的 bean 中,如以下示例所示:AmqpTemplateAmqpAdminSpring中文文档

import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final AmqpAdmin amqpAdmin;

	private final AmqpTemplate amqpTemplate;

	public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
		this.amqpAdmin = amqpAdmin;
		this.amqpTemplate = amqpTemplate;
	}

	// ...

	public void someMethod() {
		this.amqpAdmin.getQueueInfo("someQueue");
	}

	public void someOtherMethod() {
		this.amqpTemplate.convertAndSend("hello");
	}

}
import org.springframework.amqp.core.AmqpAdmin
import org.springframework.amqp.core.AmqpTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val amqpAdmin: AmqpAdmin, private val amqpTemplate: AmqpTemplate) {

	// ...

	fun someMethod() {
		amqpAdmin.getQueueInfo("someQueue")
	}

	fun someOtherMethod() {
		amqpTemplate.convertAndSend("hello")
	}

}
RabbitMessagingTemplate 可以以类似的方式注入。 如果定义了 Bean,则会自动将其关联到自动配置的 .MessageConverterAmqpTemplate

如有必要,将自动使用定义为 Bean 的任何消息在 RabbitMQ 实例上声明相应的队列。org.springframework.amqp.core.QueueSpring中文文档

要重试操作,您可以在 上启用重试(例如,在代理连接丢失的情况下):AmqpTemplateSpring中文文档

spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=2s
spring:
  rabbitmq:
    template:
      retry:
        enabled: true
        initial-interval: "2s"

默认情况下,重试处于禁用状态。 您还可以通过声明 Bean 以编程方式自定义。RetryTemplateRabbitRetryTemplateCustomizerSpring中文文档

如果您需要创建更多实例,或者想要覆盖默认值,Spring Boot 提供了一个 bean,您可以使用它与自动配置使用的工厂相同的设置来初始化 bean。RabbitTemplateRabbitTemplateConfigurerRabbitTemplateSpring中文文档

RabbitMessagingTemplate 可以以类似的方式注入。 如果定义了 Bean,则会自动将其关联到自动配置的 .MessageConverterAmqpTemplate

向流发送消息

若要向特定流发送消息,请指定流的名称,如以下示例所示:Spring中文文档

spring.rabbitmq.stream.name=my-stream
spring:
  rabbitmq:
    stream:
      name: "my-stream"

如果定义了 、 或 bean,则会自动将其关联到自动配置的 .MessageConverterStreamMessageConverterProducerCustomizerRabbitStreamTemplateSpring中文文档

如果您需要创建更多实例,或者想要覆盖默认值,Spring Boot 提供了一个 bean,您可以使用它与自动配置使用的工厂相同的设置来初始化 bean。RabbitStreamTemplateRabbitStreamTemplateConfigurerRabbitStreamTemplateSpring中文文档

接收消息

当 Rabbit 基础结构存在时,可以对任何 Bean 进行注释以创建侦听器端点。 如果未定义“否”,则会自动配置默认值,您可以使用该属性切换到直接容器。 如果定义了 a 或 bean,则它会自动与缺省工厂相关联。@RabbitListenerRabbitListenerContainerFactorySimpleRabbitListenerContainerFactoryspring.rabbitmq.listener.typeMessageConverterMessageRecovererSpring中文文档

以下示例组件在队列上创建侦听器终结点:someQueueSpring中文文档

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@RabbitListener(queues = "someQueue")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component

@Component
class MyBean {

	@RabbitListener(queues = ["someQueue"])
	fun processMessage(content: String?) {
		// ...
	}

}
有关更多详细信息,请参阅 @EnableRabbit 的 Javadoc

如果您需要创建更多实例,或者想要覆盖默认值,Spring Boot 会提供 a 和 a,您可以使用它们来初始化 a 和 a,其设置与自动配置使用的工厂相同。RabbitListenerContainerFactorySimpleRabbitListenerContainerFactoryConfigurerDirectRabbitListenerContainerFactoryConfigurerSimpleRabbitListenerContainerFactoryDirectRabbitListenerContainerFactorySpring中文文档

选择哪种容器类型并不重要。 这两个 Bean 由自动配置公开。

例如,以下配置类公开了另一个使用特定 :MessageConverterSpring中文文档

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration(proxyBeanMethods = false)
public class MyRabbitConfiguration {

	@Bean
	public SimpleRabbitListenerContainerFactory myFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer) {
		SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
		ConnectionFactory connectionFactory = getCustomConnectionFactory();
		configurer.configure(factory, connectionFactory);
		factory.setMessageConverter(new MyMessageConverter());
		return factory;
	}

	private ConnectionFactory getCustomConnectionFactory() {
		return ...
	}

}
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory
import org.springframework.amqp.rabbit.connection.ConnectionFactory
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

@Configuration(proxyBeanMethods = false)
class MyRabbitConfiguration {

	@Bean
	fun myFactory(configurer: SimpleRabbitListenerContainerFactoryConfigurer): SimpleRabbitListenerContainerFactory {
		val factory = SimpleRabbitListenerContainerFactory()
		val connectionFactory = getCustomConnectionFactory()
		configurer.configure(factory, connectionFactory)
		factory.setMessageConverter(MyMessageConverter())
		return factory
	}

	fun getCustomConnectionFactory() : ConnectionFactory? {
		return ...
	}

}

然后,您可以在任何 -annotated 方法中使用工厂,如下所示:@RabbitListenerSpring中文文档

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@RabbitListener(queues = "someQueue", containerFactory = "myFactory")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component

@Component
class MyBean {

	@RabbitListener(queues = ["someQueue"], containerFactory = "myFactory")
	fun processMessage(content: String?) {
		// ...
	}

}

您可以启用重试以处理侦听器引发异常的情况。 默认情况下,是使用,但您可以定义自己的。 当重试用尽时,消息将被拒绝,如果代理配置为这样做,则会丢弃或路由到死信交换。 默认情况下,重试处于禁用状态。 您还可以通过声明 Bean 以编程方式自定义。RejectAndDontRequeueRecovererMessageRecovererRetryTemplateRabbitRetryTemplateCustomizerSpring中文文档

默认情况下,如果禁用重试并且侦听器引发异常,则会无限期重试传递。 可以通过两种方式修改此行为:将属性设置为“尝试零重新传递”,或抛出 an 以指示应拒绝消息。 后者是在启用重试并达到最大传递尝试次数时使用的机制。defaultRequeueRejectedfalseAmqpRejectAndDontRequeueException
有关更多详细信息,请参阅 @EnableRabbit 的 Javadoc
选择哪种容器类型并不重要。 这两个 Bean 由自动配置公开。
默认情况下,如果禁用重试并且侦听器引发异常,则会无限期重试传递。 可以通过两种方式修改此行为:将属性设置为“尝试零重新传递”,或抛出 an 以指示应拒绝消息。 后者是在启用重试并达到最大传递尝试次数时使用的机制。defaultRequeueRejectedfalseAmqpRejectAndDontRequeueException