此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Boot 3.3.4! |
此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Boot 3.3.4! |
高级消息队列协议 (AMQP) 是一种平台中立的有线级协议,适用于面向消息的中间件。
Spring AMQP 项目将核心 Spring 概念应用于基于 AMQP 的消息传递解决方案的开发。
Spring Boot 为通过 RabbitMQ 使用 AMQP 提供了多种便利,包括Starters。spring-boot-starter-amqp
RabbitMQ 支持
RabbitMQ 是基于 AMQP 协议的轻量级、可靠、可扩展且可移植的消息代理。 Spring 使用 RabbitMQ 通过 AMQP 协议进行通信。
RabbitMQ 配置由 中的外部配置属性控制。
例如,您可以在 :spring.rabbitmq.*
application.properties
-
Properties
-
YAML
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=secret
spring:
rabbitmq:
host: "localhost"
port: 5672
username: "admin"
password: "secret"
或者,您可以使用以下属性配置相同的连接:addresses
-
Properties
-
YAML
spring.rabbitmq.addresses=amqp://admin:secret@localhost
spring:
rabbitmq:
addresses: "amqp://admin:secret@localhost"
以这种方式指定地址时,将忽略 and 属性。
如果地址使用该协议,则会自动启用 SSL 支持。host port amqps |
有关支持的基于属性的配置选项的更多内容,请参阅。
要配置 Spring AMQP 使用的 RabbitMQ 的较低级别详细信息,请定义一个 bean。RabbitProperties
ConnectionFactory
ConnectionFactoryCustomizer
如果上下文中存在 bean,它将自动用于命名由自动配置的 .ConnectionNameStrategy
CachingConnectionFactory
要对 进行应用程序范围的加法定制,请使用 bean。RabbitTemplate
RabbitTemplateCustomizer
有关更多详细信息,请参阅了解 RabbitMQ 使用的协议 AMQP。 |
以这种方式指定地址时,将忽略 and 属性。
如果地址使用该协议,则会自动启用 SSL 支持。host port amqps |
有关更多详细信息,请参阅了解 RabbitMQ 使用的协议 AMQP。 |
发送消息
Spring 的 and 是自动配置的,你可以将它们直接自动连接到你自己的 bean 中,如以下示例所示:AmqpTemplate
AmqpAdmin
-
Java
-
Kotlin
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
private final AmqpAdmin amqpAdmin;
private final AmqpTemplate amqpTemplate;
public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
this.amqpAdmin = amqpAdmin;
this.amqpTemplate = amqpTemplate;
}
// ...
public void someMethod() {
this.amqpAdmin.getQueueInfo("someQueue");
}
public void someOtherMethod() {
this.amqpTemplate.convertAndSend("hello");
}
}
import org.springframework.amqp.core.AmqpAdmin
import org.springframework.amqp.core.AmqpTemplate
import org.springframework.stereotype.Component
@Component
class MyBean(private val amqpAdmin: AmqpAdmin, private val amqpTemplate: AmqpTemplate) {
// ...
fun someMethod() {
amqpAdmin.getQueueInfo("someQueue")
}
fun someOtherMethod() {
amqpTemplate.convertAndSend("hello")
}
}
RabbitMessagingTemplate 可以以类似的方式注射。
如果定义了 bean,则它会自动关联到 auto-configured .MessageConverter AmqpTemplate |
如有必要,将自动使用定义为 Bean 的任何 Bean 在 RabbitMQ 实例上声明相应的队列。org.springframework.amqp.core.Queue
要重试操作,您可以在 上启用重试(例如,在代理连接丢失的情况下):AmqpTemplate
-
Properties
-
YAML
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=2s
spring:
rabbitmq:
template:
retry:
enabled: true
initial-interval: "2s"
默认情况下,重试处于禁用状态。
您还可以通过声明 bean 以编程方式自定义 bean。RetryTemplate
RabbitRetryTemplateCustomizer
如果你需要创建更多实例,或者想要覆盖默认值,Spring Boot 提供了一个 bean,你可以使用它来初始化一个,其设置与自动配置使用的工厂相同。RabbitTemplate
RabbitTemplateConfigurer
RabbitTemplate
RabbitMessagingTemplate 可以以类似的方式注射。
如果定义了 bean,则它会自动关联到 auto-configured .MessageConverter AmqpTemplate |
向流发送消息
要向特定流发送消息,请指定流的名称,如以下示例所示:
-
Properties
-
YAML
spring.rabbitmq.stream.name=my-stream
spring:
rabbitmq:
stream:
name: "my-stream"
如果定义了 , , 或 bean,则它会自动与自动配置的 .MessageConverter
StreamMessageConverter
ProducerCustomizer
RabbitStreamTemplate
如果你需要创建更多实例,或者想要覆盖默认值,Spring Boot 提供了一个 bean,你可以使用它来初始化一个,其设置与自动配置使用的工厂相同。RabbitStreamTemplate
RabbitStreamTemplateConfigurer
RabbitStreamTemplate
接收消息
当 Rabbit 基础结构存在时,可以使用任何 bean 进行 Comments 以创建侦听器端点。
如果未定义 no,则会自动配置 default,您可以使用该属性切换到直接容器。
如果定义了 a 或 a bean,则它将自动与默认工厂相关联。@RabbitListener
RabbitListenerContainerFactory
SimpleRabbitListenerContainerFactory
spring.rabbitmq.listener.type
MessageConverter
MessageRecoverer
以下示例组件在队列上创建一个侦听器终端节点:someQueue
-
Java
-
Kotlin
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@RabbitListener(queues = "someQueue")
public void processMessage(String content) {
// ...
}
}
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
@Component
class MyBean {
@RabbitListener(queues = ["someQueue"])
fun processMessage(content: String?) {
// ...
}
}
有关更多详细信息,请参阅。@html |
如果你需要创建更多实例,或者想要覆盖默认值,Spring Boot 提供了a和a,你可以使用它来初始化a和a,其设置与自动配置使用的工厂相同。RabbitListenerContainerFactory
SimpleRabbitListenerContainerFactoryConfigurer
DirectRabbitListenerContainerFactoryConfigurer
SimpleRabbitListenerContainerFactory
DirectRabbitListenerContainerFactory
选择哪种容器类型并不重要。 这两个 bean 由自动配置公开。 |
例如,下面的配置类公开了另一个使用特定 :MessageConverter
-
Java
-
Kotlin
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration(proxyBeanMethods = false)
public class MyRabbitConfiguration {
@Bean
public SimpleRabbitListenerContainerFactory myFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
ConnectionFactory connectionFactory = getCustomConnectionFactory();
configurer.configure(factory, connectionFactory);
factory.setMessageConverter(new MyMessageConverter());
return factory;
}
private ConnectionFactory getCustomConnectionFactory() {
return ...
}
}
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory
import org.springframework.amqp.rabbit.connection.ConnectionFactory
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
@Configuration(proxyBeanMethods = false)
class MyRabbitConfiguration {
@Bean
fun myFactory(configurer: SimpleRabbitListenerContainerFactoryConfigurer): SimpleRabbitListenerContainerFactory {
val factory = SimpleRabbitListenerContainerFactory()
val connectionFactory = getCustomConnectionFactory()
configurer.configure(factory, connectionFactory)
factory.setMessageConverter(MyMessageConverter())
return factory
}
fun getCustomConnectionFactory() : ConnectionFactory? {
return ...
}
}
然后,您可以在任何带 -annotated 的方法中使用工厂,如下所示:@RabbitListener
-
Java
-
Kotlin
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@RabbitListener(queues = "someQueue", containerFactory = "myFactory")
public void processMessage(String content) {
// ...
}
}
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
@Component
class MyBean {
@RabbitListener(queues = ["someQueue"], containerFactory = "myFactory")
fun processMessage(content: String?) {
// ...
}
}
您可以启用重试来处理侦听器引发异常的情况。
默认情况下,使用,但您可以定义自己的 a。
当重试次数用尽时,消息将被拒绝,并被丢弃或路由到死信交换(如果代理配置为这样做)。
默认情况下,重试处于禁用状态。
您还可以通过声明 bean 以编程方式自定义 bean。RejectAndDontRequeueRecoverer
MessageRecoverer
RetryTemplate
RabbitRetryTemplateCustomizer
默认情况下,如果禁用重试并且侦听器引发异常,则会无限期地重试投放。
您可以通过两种方式修改此行为:将属性设置为 以便尝试零重新投放,或抛出 an 以指示应拒绝消息。
后者是启用重试并达到最大投放尝试次数时使用的机制。defaultRequeueRejected false AmqpRejectAndDontRequeueException |
有关更多详细信息,请参阅。@html |
选择哪种容器类型并不重要。 这两个 bean 由自动配置公开。 |
默认情况下,如果禁用重试并且侦听器引发异常,则会无限期地重试投放。
您可以通过两种方式修改此行为:将属性设置为 以便尝试零重新投放,或抛出 an 以指示应拒绝消息。
后者是启用重试并达到最大投放尝试次数时使用的机制。defaultRequeueRejected false AmqpRejectAndDontRequeueException |