消息
1. JMS的
该接口提供了一种创建用于与 JMS 代理交互的标准方法。
尽管 Spring 需要与 JMS 一起使用,但您通常不需要自己直接使用它,而是可以依赖更高级别的消息传递抽象。
(有关详细信息,请参阅 Spring Framework 参考文档的相关部分。
Spring Boot 还自动配置了发送和接收消息所需的基础设施。jakarta.jms.ConnectionFactory
jakarta.jms.Connection
ConnectionFactory
1.1. ActiveMQ “Classic” 支持
当 ActiveMQ “Classic” 在 Classpath 上可用时, Spring Boot 可以配置 .ConnectionFactory
如果您使用 ,则提供了连接到 ActiveMQ “Classic” 实例所需的依赖项,以及与 JMS 集成的 Spring 基础架构。spring-boot-starter-activemq |
ActiveMQ “Classic” 配置由 中的外部配置属性控制。
默认情况下,ActiveMQ “Classic” 自动配置为使用 TCP 传输,默认情况下连接到 .以下示例显示如何更改默认代理 URL:spring.activemq.*
tcp://localhost:61616
spring.activemq.broker-url=tcp://192.168.1.210:9876
spring.activemq.user=admin
spring.activemq.password=secret
spring:
activemq:
broker-url: "tcp://192.168.1.210:9876"
user: "admin"
password: "secret"
默认情况下, a 使用合理的设置包装本机,您可以通过外部配置属性控制这些设置:CachingConnectionFactory
ConnectionFactory
spring.jms.*
spring.jms.cache.session-cache-size=5
spring:
jms:
cache:
session-cache-size: 5
如果您更愿意使用本机池,可以通过添加依赖项并相应地配置来实现,如以下示例所示:org.messaginghub:pooled-jms
JmsPoolConnectionFactory
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=50
spring:
activemq:
pool:
enabled: true
max-connections: 50
有关更多支持的选项,请参阅 ActiveMQProperties 。
您还可以注册任意数量的 bean 来实现更高级的自定义。ActiveMQConnectionFactoryCustomizer |
默认情况下,ActiveMQ “Classic” 会创建一个目标(如果尚不存在),以便根据提供的名称解析目标。
1.2. ActiveMQ Artemis 支持
Spring Boot 可以在检测到 ActiveMQ Artemis 在 Classpath 上可用时自动配置。
如果存在代理,则会自动启动并配置嵌入式代理(除非已明确设置 mode 属性)。
支持的模式是(明确表示需要嵌入式代理,如果代理在 Classpath 上不可用,则应该发生错误)和(使用传输协议连接到代理)。
配置后者后, Spring Boot 会配置一个,该代理使用默认设置连接到本地计算机上运行的代理。ConnectionFactory
embedded
native
netty
ConnectionFactory
如果您使用 ,则提供了连接到现有 ActiveMQ Artemis 实例所需的依赖项,以及与 JMS 集成的 Spring 基础架构。
通过添加到应用程序,可以使用嵌入式模式。spring-boot-starter-artemis org.apache.activemq:artemis-jakarta-server |
ActiveMQ Artemis 配置由 中的外部配置属性控制。
例如,您可以在 :spring.artemis.*
application.properties
spring.artemis.mode=native
spring.artemis.broker-url=tcp://192.168.1.210:9876
spring.artemis.user=admin
spring.artemis.password=secret
spring:
artemis:
mode: native
broker-url: "tcp://192.168.1.210:9876"
user: "admin"
password: "secret"
在嵌入代理时,您可以选择是否要启用持久性并列出应可用的目标。
这些可以指定为逗号分隔的列表,以便使用默认选项创建它们,或者您可以分别为高级队列和主题配置定义 或 类型的 bean。org.apache.activemq.artemis.jms.server.config.JMSQueueConfiguration
org.apache.activemq.artemis.jms.server.config.TopicConfiguration
默认情况下, a 使用合理的设置包装本机,您可以通过外部配置属性控制这些设置:CachingConnectionFactory
ConnectionFactory
spring.jms.*
spring.jms.cache.session-cache-size=5
spring:
jms:
cache:
session-cache-size: 5
如果您更愿意使用本机池,可以通过添加依赖项并相应地配置来实现,如以下示例所示:org.messaginghub:pooled-jms
JmsPoolConnectionFactory
spring.artemis.pool.enabled=true
spring.artemis.pool.max-connections=50
spring:
artemis:
pool:
enabled: true
max-connections: 50
有关更多支持的选项,请参阅 ArtemisProperties
。
不涉及 JNDI 查找,并且使用 ActiveMQ Artemis 配置中的属性或通过配置提供的名称根据其名称解析目标。name
1.3. 使用 JNDI ConnectionFactory
如果您在应用程序服务器中运行应用程序,则 Spring Boot 会尝试使用 JNDI 查找 JMS。
默认情况下,将选中 和 location。
如果需要指定备用位置,则可以使用该属性,如以下示例所示:ConnectionFactory
java:/JmsXA
java:/XAConnectionFactory
spring.jms.jndi-name
spring.jms.jndi-name=java:/MyConnectionFactory
spring:
jms:
jndi-name: "java:/MyConnectionFactory"
1.4. 发送消息
Spring 的 bean 是自动配置的,你可以将其直接自动连接到你自己的 bean 中,如以下示例所示:JmsTemplate
@Component
public class MyBean {
private final JmsTemplate jmsTemplate;
public MyBean(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
}
@Component
class MyBean(private val jmsTemplate: JmsTemplate) {
}
JmsMessagingTemplate 可以以类似的方式注入。
如果定义了 a 或 a bean,则它会自动关联到 auto-configured 的 。DestinationResolver MessageConverter JmsTemplate |
1.5. 接收消息
当 JMS 基础结构存在时,可以使用任何 bean 进行注释以创建侦听器端点。
如果未定义 no,则会自动配置默认 1 个。
如果定义了 a 、 a 或 a bean,则它们将自动与默认工厂关联。@JmsListener
JmsListenerContainerFactory
DestinationResolver
MessageConverter
jakarta.jms.ExceptionListener
默认情况下,默认工厂是事务性的。
如果您在存在 a 的基础设施中运行,则默认情况下它与侦听器容器相关联。
否则,将启用该标志。
在后一种情况下,您可以通过添加侦听器方法(或其委托)将本地数据存储事务与传入消息的处理相关联。
这可确保在本地事务完成后确认传入消息。
这还包括发送已在同一 JMS 会话上执行的响应消息。JtaTransactionManager
sessionTransacted
@Transactional
以下组件在目标上创建一个侦听器终端节点:someQueue
@Component
public class MyBean {
@JmsListener(destination = "someQueue")
public void processMessage(String content) {
// ...
}
}
@Component
class MyBean {
@JmsListener(destination = "someQueue")
fun processMessage(content: String?) {
// ...
}
}
有关更多详细信息,请参阅 @EnableJms 的 Javadoc。 |
如果你需要创建更多实例,或者想要覆盖默认值,Spring Boot 提供了一个,你可以使用它来初始化一个,其设置与自动配置的设置相同。JmsListenerContainerFactory
DefaultJmsListenerContainerFactoryConfigurer
DefaultJmsListenerContainerFactory
例如,以下示例公开了另一个使用特定 :MessageConverter
@Configuration(proxyBeanMethods = false)
public class MyJmsConfiguration {
@Bean
public DefaultJmsListenerContainerFactory myFactory(DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
ConnectionFactory connectionFactory = getCustomConnectionFactory();
configurer.configure(factory, connectionFactory);
factory.setMessageConverter(new MyMessageConverter());
return factory;
}
private ConnectionFactory getCustomConnectionFactory() {
return ...
}
}
@Configuration(proxyBeanMethods = false)
class MyJmsConfiguration {
@Bean
fun myFactory(configurer: DefaultJmsListenerContainerFactoryConfigurer): DefaultJmsListenerContainerFactory {
val factory = DefaultJmsListenerContainerFactory()
val connectionFactory = getCustomConnectionFactory()
configurer.configure(factory, connectionFactory)
factory.setMessageConverter(MyMessageConverter())
return factory
}
fun getCustomConnectionFactory() : ConnectionFactory? {
return ...
}
}
然后,您可以在任何带 -annotated 的方法中使用工厂,如下所示:@JmsListener
@Component
public class MyBean {
@JmsListener(destination = "someQueue", containerFactory = "myFactory")
public void processMessage(String content) {
// ...
}
}
@Component
class MyBean {
@JmsListener(destination = "someQueue", containerFactory = "myFactory")
fun processMessage(content: String?) {
// ...
}
}
2. AMQP
高级消息队列协议 (AMQP) 是一种平台中立的有线级协议,适用于面向消息的中间件。
Spring AMQP 项目将核心 Spring 概念应用于基于 AMQP 的消息传递解决方案的开发。
Spring Boot 为通过 RabbitMQ 使用 AMQP 提供了多种便利,包括“Starter”。spring-boot-starter-amqp
2.1. RabbitMQ 支持
RabbitMQ 是基于 AMQP 协议的轻量级、可靠、可扩展且可移植的消息代理。 Spring 使用 RabbitMQ 通过 AMQP 协议进行通信。
RabbitMQ 配置由 中的外部配置属性控制。
例如,您可以在 :spring.rabbitmq.*
application.properties
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=secret
spring:
rabbitmq:
host: "localhost"
port: 5672
username: "admin"
password: "secret"
或者,您可以使用以下属性配置相同的连接:addresses
spring.rabbitmq.addresses=amqp://admin:secret@localhost
spring:
rabbitmq:
addresses: "amqp://admin:secret@localhost"
以这种方式指定地址时,将忽略 and 属性。
如果地址使用该协议,则会自动启用 SSL 支持。host port amqps |
有关更多受支持的基于属性的配置选项,请参见 RabbitProperties
。
要配置 Spring AMQP 使用的 RabbitMQ 的较低级别详细信息,请定义一个 bean。ConnectionFactory
ConnectionFactoryCustomizer
如果上下文中存在 bean,它将自动用于命名由自动配置的 .ConnectionNameStrategy
CachingConnectionFactory
要对 进行应用程序范围的加法定制,请使用 bean。RabbitTemplate
RabbitTemplateCustomizer
有关更多详细信息,请参阅了解 RabbitMQ 使用的协议 AMQP。 |
2.2. 发送消息
Spring 的 and 是自动配置的,你可以将它们直接自动连接到你自己的 bean 中,如以下示例所示:AmqpTemplate
AmqpAdmin
@Component
public class MyBean {
private final AmqpAdmin amqpAdmin;
private final AmqpTemplate amqpTemplate;
public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
this.amqpAdmin = amqpAdmin;
this.amqpTemplate = amqpTemplate;
}
}
@Component
class MyBean(private val amqpAdmin: AmqpAdmin, private val amqpTemplate: AmqpTemplate) {
}
RabbitMessagingTemplate 可以以类似的方式注入。
如果定义了 bean,则它会自动关联到 auto-configured .MessageConverter AmqpTemplate |
如有必要,将自动使用定义为 Bean 的任何 Bean 在 RabbitMQ 实例上声明相应的队列。org.springframework.amqp.core.Queue
要重试操作,您可以在 上启用重试(例如,在代理连接丢失的情况下):AmqpTemplate
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=2s
spring:
rabbitmq:
template:
retry:
enabled: true
initial-interval: "2s"
默认情况下,重试处于禁用状态。
您还可以通过声明 bean 以编程方式自定义 bean。RetryTemplate
RabbitRetryTemplateCustomizer
如果你需要创建更多实例,或者想要覆盖默认值,Spring Boot 提供了一个 bean,你可以使用它来初始化一个,其设置与自动配置使用的工厂相同。RabbitTemplate
RabbitTemplateConfigurer
RabbitTemplate
2.3. 向流发送消息
要向特定流发送消息,请指定流的名称,如以下示例所示:
spring.rabbitmq.stream.name=my-stream
spring:
rabbitmq:
stream:
name: "my-stream"
如果定义了 , , 或 bean,则它会自动与自动配置的 .MessageConverter
StreamMessageConverter
ProducerCustomizer
RabbitStreamTemplate
如果你需要创建更多实例,或者想要覆盖默认值,Spring Boot 提供了一个 bean,你可以使用它来初始化一个,其设置与自动配置使用的工厂相同。RabbitStreamTemplate
RabbitStreamTemplateConfigurer
RabbitStreamTemplate
2.4. 接收消息
当 Rabbit 基础结构存在时,可以使用任何 bean 进行 Comments 以创建侦听器端点。
如果未定义 no,则会自动配置 default,您可以使用该属性切换到直接容器。
如果定义了 a 或 a bean,则它将自动与默认工厂相关联。@RabbitListener
RabbitListenerContainerFactory
SimpleRabbitListenerContainerFactory
spring.rabbitmq.listener.type
MessageConverter
MessageRecoverer
以下示例组件在队列上创建一个侦听器终端节点:someQueue
@Component
public class MyBean {
@RabbitListener(queues = "someQueue")
public void processMessage(String content) {
// ...
}
}
@Component
class MyBean {
@RabbitListener(queues = ["someQueue"])
fun processMessage(content: String?) {
// ...
}
}
有关更多详细信息,请参阅 @EnableRabbit 的 Javadoc。 |
如果你需要创建更多实例,或者想要覆盖默认值,Spring Boot 提供了a和a,你可以使用它来初始化a和a,其设置与自动配置使用的工厂相同。RabbitListenerContainerFactory
SimpleRabbitListenerContainerFactoryConfigurer
DirectRabbitListenerContainerFactoryConfigurer
SimpleRabbitListenerContainerFactory
DirectRabbitListenerContainerFactory
选择哪种容器类型并不重要。 这两个 bean 由自动配置公开。 |
例如,下面的配置类公开了另一个使用特定 :MessageConverter
@Configuration(proxyBeanMethods = false)
public class MyRabbitConfiguration {
@Bean
public SimpleRabbitListenerContainerFactory myFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
ConnectionFactory connectionFactory = getCustomConnectionFactory();
configurer.configure(factory, connectionFactory);
factory.setMessageConverter(new MyMessageConverter());
return factory;
}
private ConnectionFactory getCustomConnectionFactory() {
return ...
}
}
@Configuration(proxyBeanMethods = false)
class MyRabbitConfiguration {
@Bean
fun myFactory(configurer: SimpleRabbitListenerContainerFactoryConfigurer): SimpleRabbitListenerContainerFactory {
val factory = SimpleRabbitListenerContainerFactory()
val connectionFactory = getCustomConnectionFactory()
configurer.configure(factory, connectionFactory)
factory.setMessageConverter(MyMessageConverter())
return factory
}
fun getCustomConnectionFactory() : ConnectionFactory? {
return ...
}
}
然后,您可以在任何带 -annotated 的方法中使用工厂,如下所示:@RabbitListener
@Component
public class MyBean {
@RabbitListener(queues = "someQueue", containerFactory = "myFactory")
public void processMessage(String content) {
// ...
}
}
@Component
class MyBean {
@RabbitListener(queues = ["someQueue"], containerFactory = "myFactory")
fun processMessage(content: String?) {
// ...
}
}
您可以启用重试来处理侦听器引发异常的情况。
默认情况下,使用,但您可以定义自己的 a。
当重试次数用尽时,消息将被拒绝,并被丢弃或路由到死信交换(如果代理配置为这样做)。
默认情况下,重试处于禁用状态。
您还可以通过声明 bean 以编程方式自定义 bean。RejectAndDontRequeueRecoverer
MessageRecoverer
RetryTemplate
RabbitRetryTemplateCustomizer
默认情况下,如果禁用重试并且侦听器引发异常,则会无限期地重试投放。
您可以通过两种方式修改此行为:将属性设置为 以便尝试零重新投放,或抛出 an 以指示应拒绝消息。
后者是启用重试并达到最大投放尝试次数时使用的机制。defaultRequeueRejected false AmqpRejectAndDontRequeueException |
3. Apache Kafka 支持
通过提供项目的自动配置来支持 Apache Kafka。spring-kafka
Kafka 配置由 中的外部配置属性控制。
例如,您可以在 :spring.kafka.*
application.properties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
spring:
kafka:
bootstrap-servers: "localhost:9092"
consumer:
group-id: "myGroup"
要在启动时创建主题,请添加 类型的 bean 。
如果主题已存在,则忽略该 Bean。NewTopic |
有关更多支持的选项,请参阅 KafkaProperties
。
3.1. 发送消息
Spring 的 bean 是自动配置的,你可以直接在自己的 bean 中自动装配它,如以下示例所示:KafkaTemplate
@Component
public class MyBean {
private final KafkaTemplate<String, String> kafkaTemplate;
public MyBean(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
}
@Component
class MyBean(private val kafkaTemplate: KafkaTemplate<String, String>) {
}
如果定义了属性,则会自动配置 a。
此外,如果定义了 bean,则它会自动关联到 auto-configured 的 .spring.kafka.producer.transaction-id-prefix KafkaTransactionManager RecordMessageConverter KafkaTemplate |
3.2. 接收消息
当存在 Apache Kafka 基础结构时,可以使用任何 bean 进行注释以创建侦听器终端节点。
如果未定义 no,则会自动使用 中定义的键配置默认 。@KafkaListener
KafkaListenerContainerFactory
spring.kafka.listener.*
以下组件在主题上创建侦听器终端节点:someTopic
@Component
public class MyBean {
@KafkaListener(topics = "someTopic")
public void processMessage(String content) {
// ...
}
}
@Component
class MyBean {
@KafkaListener(topics = ["someTopic"])
fun processMessage(content: String?) {
// ...
}
}
如果定义了 Bean,则它会自动与容器工厂关联。
同样,如果定义了 , , 或 bean,则它会自动与默认工厂关联。KafkaTransactionManager
RecordFilterStrategy
CommonErrorHandler
AfterRollbackProcessor
ConsumerAwareRebalanceListener
根据侦听器类型,a 或 bean 与默认工厂相关联。
如果批处理侦听器仅存在 bean,则将其包装在 .RecordMessageConverter
BatchMessageConverter
RecordMessageConverter
BatchMessageConverter
必须标记自定义,因为它通常引用自动配置的 bean。ChainedKafkaTransactionManager @Primary KafkaTransactionManager |
3.3. Kafka 流
Spring for Apache Kafka 提供了一个工厂 Bean 来创建对象并管理其流的生命周期。
Spring Boot 只要在 Classpath 上,并且 Kafka Streams 由 Comments 启用,它就会自动配置所需的 bean。StreamsBuilder
KafkaStreamsConfiguration
kafka-streams
@EnableKafkaStreams
启用 Kafka Streams 意味着必须设置应用程序 ID 和引导服务器。
前者可以使用 ,如果未设置,则默认为 。
后者可以全局设置,也可以仅针对流专门覆盖。spring.kafka.streams.application-id
spring.application.name
使用专用属性可以使用多个其他属性;可以使用命名空间设置其他任意 Kafka 属性。
有关更多信息,另请参阅其他 Kafka 属性。spring.kafka.streams.properties
要使用工厂 Bean,请连接到 u,如以下示例所示:StreamsBuilder
@Bean
@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
public class MyKafkaStreamsConfiguration {
@Bean
public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), new JsonSerde<>()));
return stream;
}
private KeyValue<Integer, String> uppercaseValue(Integer key, String value) {
return new KeyValue<>(key, value.toUpperCase(Locale.getDefault()));
}
}
@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
class MyKafkaStreamsConfiguration {
@Bean
fun kStream(streamsBuilder: StreamsBuilder): KStream<Int, String> {
val stream = streamsBuilder.stream<Int, String>("ks1In")
stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), JsonSerde()))
return stream
}
private fun uppercaseValue(key: Int, value: String): KeyValue<Int?, String?> {
return KeyValue(key, value.uppercase())
}
}
默认情况下,对象管理的流会自动启动。
您可以使用 属性 自定义此行为。StreamBuilder
spring.kafka.streams.auto-startup
3.4. 其他 Kafka 属性
自动配置支持的属性显示在附录的 “集成属性” 部分中。 请注意,在大多数情况下,这些属性(带连字符或 camelCase)直接映射到 Apache Kafka 点分隔属性。 有关详细信息,请参阅 Apache Kafka 文档。
名称中不包含客户端类型 (、 、 或 ) 的属性被视为通用属性,并适用于所有客户端。
如果需要,可以覆盖一个或多个客户端类型的大多数公共属性。producer
consumer
admin
streams
Apache Kafka 指定重要性为 HIGH、MEDIUM 或 LOW 的属性。 Spring Boot 自动配置支持所有 HIGH 重要性属性,一些选定的 MEDIUM 和 LOW 属性,以及任何没有默认值的属性。
只有 Kafka 支持的属性子集可以直接通过该类使用。
如果要使用不直接支持的其他属性配置各个客户端类型,请使用以下属性:KafkaProperties
spring.kafka.properties[prop.one]=first
spring.kafka.admin.properties[prop.two]=second
spring.kafka.consumer.properties[prop.three]=third
spring.kafka.producer.properties[prop.four]=fourth
spring.kafka.streams.properties[prop.five]=fifth
spring:
kafka:
properties:
"[prop.one]": "first"
admin:
properties:
"[prop.two]": "second"
consumer:
properties:
"[prop.three]": "third"
producer:
properties:
"[prop.four]": "fourth"
streams:
properties:
"[prop.five]": "fifth"
这会将公共 Kafka 属性设置为(适用于创建者、使用者、管理员和流),将 admin 属性设置为,将使用者属性设置为,将 producer 属性设置为,将 streams 属性设置为。prop.one
first
prop.two
second
prop.three
third
prop.four
fourth
prop.five
fifth
您还可以按如下方式配置 Spring Kafka:JsonDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties[spring.json.value.default.type]=com.example.Invoice
spring.kafka.consumer.properties[spring.json.trusted.packages]=com.example.main,com.example.another
spring:
kafka:
consumer:
value-deserializer: "org.springframework.kafka.support.serializer.JsonDeserializer"
properties:
"[spring.json.value.default.type]": "com.example.Invoice"
"[spring.json.trusted.packages]": "com.example.main,com.example.another"
同样,您可以禁用在 Headers 中发送类型信息的默认行为:JsonSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties[spring.json.add.type.headers]=false
spring:
kafka:
producer:
value-serializer: "org.springframework.kafka.support.serializer.JsonSerializer"
properties:
"[spring.json.add.type.headers]": false
以这种方式设置的属性将覆盖 Spring Boot 显式支持的任何配置项。 |
3.5. 使用嵌入式 Kafka 进行测试
Spring for Apache Kafka 提供了一种使用嵌入式 Apache Kafka 代理测试项目的便捷方法。
要使用此功能,请使用 from the module 注释测试类。
有关更多信息,请参阅 Spring for Apache Kafka 参考手册。@EmbeddedKafka
spring-kafka-test
要使 Spring Boot 自动配置与上述嵌入式 Apache Kafka 代理一起使用,您需要将嵌入式代理地址的系统属性(由 填充 )重新映射到 Apache Kafka 的 Spring Boot 配置属性中。
有几种方法可以做到这一点:EmbeddedKafkaBroker
-
在测试类中提供一个 system 属性,用于将嵌入式代理地址映射到其中:
spring.kafka.bootstrap-servers
static {
System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
}
init {
System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers")
}
-
在注释上配置属性名称:
@EmbeddedKafka
@SpringBootTest
@EmbeddedKafka(topics = "someTopic", bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {
// ...
}
@SpringBootTest
@EmbeddedKafka(topics = ["someTopic"], bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {
// ...
}
-
在配置属性中使用占位符:
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
spring:
kafka:
bootstrap-servers: "${spring.embedded.kafka.brokers}"
4. Apache Pulsar 支持
通过提供 Spring for Apache Pulsar 项目的自动配置来支持 Apache Pulsar。
Spring Boot 将在 classpath 上时自动配置和注册经典(命令式)Spring for Apache Pulsar 组件。
当 在 Classpath 上时,它将对响应式组件执行相同的操作。org.springframework.pulsar:spring-pulsar
org.springframework.pulsar:spring-pulsar-reactive
有 和 “Starters” 分别用于方便地收集用于命令式和反应式使用的依赖项。spring-boot-starter-pulsar
spring-boot-starter-pulsar-reactive
4.1. 连接到 Pulsar
当您使用 Pulsar Starters时, Spring Boot 将自动配置并注册一个 bean。PulsarClient
默认情况下,应用程序会尝试连接到 位于 的本地 Pulsar 实例。
这可以通过将属性设置为其他值来调整。pulsar://localhost:6650
spring.pulsar.client.service-url
该值必须是有效的 Pulsar 协议 URL |
您可以通过指定任何带前缀的应用程序属性来配置客户端。spring.pulsar.client.*
如果需要对配置进行更多控制,请考虑注册一个或多个 bean。PulsarClientBuilderCustomizer
4.1.1. 身份验证
要连接到需要身份验证的 Pulsar 集群,你需要通过设置插件所需的 the 和任何参数来指定要使用的身份验证插件。
您可以将参数设置为参数名称到参数值的映射。
以下示例显示如何配置插件。pluginClassName
AuthenticationOAuth2
spring.pulsar.client.authentication.plugin-class-name=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
spring.pulsar.client.authentication.param[issuerUrl]=https://auth.server.cloud/
spring.pulsar.client.authentication.param[privateKey]=file:///Users/some-key.json
spring.pulsar.client.authentication.param.audience=urn:sn:acme:dev:my-instance
spring:
pulsar:
client:
authentication:
plugin-class-name: org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
param:
issuerUrl: https://auth.server.cloud/
privateKey: file:///Users/some-key.json
audience: urn:sn:acme:dev:my-instance
您需要确保 下定义的名称与 auth 插件(通常是驼峰式大小写)所期望的名称完全匹配。
Spring Boot 不会尝试对这些条目进行任何类型的松散绑定。 例如,如果要为 auth 插件配置颁发者 URL,则必须使用 。
如果使用其他形式(如 或 ),则设置将不会应用于插件。 这种缺乏松散的绑定还使得对身份验证参数使用环境变量成问题,因为在转换过程中会丢失区分大小写。 如果你对参数使用环境变量,则需要按照 Spring for Apache Pulsar 参考文档中的这些步骤进行操作,使其正常工作。 |
4.2. 连接到 Pulsar Reactors
激活 Reactive 自动配置后, Spring Boot 将自动配置并注册一个 bean。ReactivePulsarClient
会适配前面描述的 .
因此,请按照上一节配置 使用的 。ReactivePulsarClient
PulsarClient
PulsarClient
ReactivePulsarClient
4.3. 连接到 Pulsar Administration
Spring for Apache Pulsar 的客户端也是自动配置的。PulsarAdministration
默认情况下,应用程序会尝试连接到 位于 的本地 Pulsar 实例。
这可以通过在表单中将属性设置为其他值来调整。http://localhost:8080
spring.pulsar.admin.service-url
(http|https)://<host>:<port>
如果需要对配置进行更多控制,请考虑注册一个或多个 bean。PulsarAdminBuilderCustomizer
4.4. 发送消息
Spring 的 是自动配置的,你可以使用它来发送消息,如以下示例所示:PulsarTemplate
@Component
public class MyBean {
private final PulsarTemplate<String> pulsarTemplate;
public MyBean(PulsarTemplate<String> pulsarTemplate) {
this.pulsarTemplate = pulsarTemplate;
}
public void someMethod() throws PulsarClientException {
this.pulsarTemplate.send("someTopic", "Hello");
}
}
@Component
class MyBean(private val pulsarTemplate: PulsarTemplate<String>) {
@Throws(PulsarClientException::class)
fun someMethod() {
pulsarTemplate.send("someTopic", "Hello")
}
}
它依赖于 a 来创建底层的 Pulsar 生产者。
Spring Boot 自动配置还提供了这个 producer 工厂,默认情况下,它会缓存它创建的 producer。
您可以通过指定任何 和 前缀应用程序属性来配置创建者出厂设置和缓存设置。PulsarTemplate
PulsarProducerFactory
spring.pulsar.producer.*
spring.pulsar.producer.cache.*
如果需要对生产者工厂配置进行更多控制,请考虑注册一个或多个 bean。
这些定制器将应用于所有创建的生产者。
您也可以传入 when sending a message 以仅影响当前生产者。ProducerBuilderCustomizer
ProducerBuilderCustomizer
如果您需要对正在发送的消息进行更多控制,则可以在发送消息时传入 。TypedMessageBuilderCustomizer
4.5. 响应式发送消息
激活 Reactive 自动配置后, Spring 的自动配置是自动配置的,你可以使用它来发送消息,如以下示例所示:ReactivePulsarTemplate
@Component
public class MyBean {
private final ReactivePulsarTemplate<String> pulsarTemplate;
public MyBean(ReactivePulsarTemplate<String> pulsarTemplate) {
this.pulsarTemplate = pulsarTemplate;
}
public void someMethod() {
this.pulsarTemplate.send("someTopic", "Hello").subscribe();
}
}
@Component
class MyBean(private val pulsarTemplate: ReactivePulsarTemplate<String>) {
fun someMethod() {
pulsarTemplate.send("someTopic", "Hello").subscribe()
}
}
这依赖于 a 来实际创建底层发件人。
Spring Boot 自动配置还提供了这个发送者工厂,默认情况下,它会缓存它创建的生产者。
您可以通过指定任何 和 前缀应用程序属性来配置发送程序出厂设置和缓存设置。ReactivePulsarTemplate
ReactivePulsarSenderFactory
spring.pulsar.producer.*
spring.pulsar.producer.cache.*
如果您需要对发送方工厂配置进行更多控制,请考虑注册一个或多个 bean。
这些定制器将应用于所有创建的发件人。
你也可以传入 when sending a message 来只影响当前发件人。ReactiveMessageSenderBuilderCustomizer
ReactiveMessageSenderBuilderCustomizer
如果您需要对正在发送的消息进行更多控制,则可以在发送消息时传入 。MessageSpecBuilderCustomizer
4.6. 接收消息
当 Apache Pulsar 基础设施存在时,任何 bean 都可以进行注释以创建侦听器端点。
以下组件在主题上创建侦听器终端节点:@PulsarListener
someTopic
@Component
public class MyBean {
@PulsarListener(topics = "someTopic")
public void processMessage(String content) {
// ...
}
}
@Component
class MyBean {
@PulsarListener(topics = ["someTopic"])
fun processMessage(content: String?) {
// ...
}
}
Spring Boot 自动配置提供了所有必要的组件,例如它用来构建底层 Pulsar 消费者的消费者工厂。
您可以通过指定任何 和 前缀应用程序属性来配置这些组件。PulsarListener
PulsarListenerContainerFactory
spring.pulsar.listener.*
spring.pulsar.consumer.*
如果需要对 Consumer Factory 配置进行更多控制,请考虑注册一个或多个 bean。
这些定制器将应用于工厂创建的所有使用者,因此也应用于所有实例。
您还可以通过设置注释的属性来自定义单个侦听器。ConsumerBuilderCustomizer
@PulsarListener
consumerCustomizer
@PulsarListener
4.7. 响应式接收消息
当 Apache Pulsar 基础设施存在并激活 Reactive 自动配置时,任何 bean 都可以被注释以创建反应式侦听器端点。
以下组件在主题上创建反应式侦听器终端节点:@ReactivePulsarListener
someTopic
@Component
public class MyBean {
@ReactivePulsarListener(topics = "someTopic")
public Mono<Void> processMessage(String content) {
// ...
return Mono.empty();
}
}
@Component
class MyBean {
@ReactivePulsarListener(topics = ["someTopic"])
fun processMessage(content: String?): Mono<Void> {
// ...
return Mono.empty()
}
}
Spring Boot 自动配置提供了所有必要的组件,例如它用来构建底层反应式 Pulsar 消费者的消费者工厂。
你可以通过指定任何 和 spring.pulsar.consumer.
前缀的应用程序属性来配置这些组件。ReactivePulsarListener
ReactivePulsarListenerContainerFactory
spring.pulsar.listener.
如果需要对 Consumer Factory 配置进行更多控制,请考虑注册一个或多个 bean。
这些定制器将应用于工厂创建的所有使用者,因此也应用于所有实例。
您还可以通过设置注释的属性来自定义单个侦听器。ReactiveMessageConsumerBuilderCustomizer
@ReactivePulsarListener
consumerCustomizer
@ReactivePulsarListener
4.8. 读取消息
Pulsar 读取器接口使应用程序能够手动管理游标。 当您使用读取器连接到主题时,您需要指定读取器在连接到主题时从哪条消息开始读取。
当存在 Apache Pulsar 基础设施时,任何 bean 都可以被注释以使用读取器来消费消息。
以下组件创建一个读取器终端节点,该终端节点从主题的开头开始读取消息:@PulsarReader
someTopic
@Component
public class MyBean {
@PulsarReader(topics = "someTopic", startMessageId = "earliest")
public void processMessage(String content) {
// ...
}
}
@Component
class MyBean {
@PulsarReader(topics = ["someTopic"], startMessageId = "earliest")
fun processMessage(content: String?) {
// ...
}
}
它依赖于 a 来创建底层的 Pulsar 读取器。
Spring Boot 自动配置提供了这个读取器工厂,可以通过设置任何带前缀的应用程序属性来自定义。@PulsarReader
PulsarReaderFactory
spring.pulsar.reader.*
如果需要对 reader 出厂配置进行更多控制,请考虑注册一个或多个 bean。
这些定制器应用于工厂创建的所有读取器,因此应用于所有实例。
您还可以通过设置注释的属性来自定义单个侦听器。ReaderBuilderCustomizer
@PulsarReader
readerCustomizer
@PulsarReader
4.9. 响应式读取消息
当 Apache Pulsar 基础设施存在并激活 Reactive 自动配置时,会提供 Spring 的,你可以使用它来创建一个 reader,以便以 reactive 方式读取消息。
以下组件使用提供的工厂创建一个读取器,并从主题中读取 5 分钟前的一条消息:ReactivePulsarReaderFactory
someTopic
@Component
public class MyBean {
private final ReactivePulsarReaderFactory<String> pulsarReaderFactory;
public MyBean(ReactivePulsarReaderFactory<String> pulsarReaderFactory) {
this.pulsarReaderFactory = pulsarReaderFactory;
}
public void someMethod() {
ReactiveMessageReaderBuilderCustomizer<String> readerBuilderCustomizer = (readerBuilder) -> readerBuilder
.topic("someTopic")
.startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5)));
Mono<Message<String>> message = this.pulsarReaderFactory
.createReader(Schema.STRING, List.of(readerBuilderCustomizer))
.readOne();
// ...
}
}
@Component
class MyBean(private val pulsarReaderFactory: ReactivePulsarReaderFactory<String>) {
fun someMethod() {
val readerBuilderCustomizer = ReactiveMessageReaderBuilderCustomizer {
readerBuilder: ReactiveMessageReaderBuilder<String> ->
readerBuilder
.topic("someTopic")
.startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5)))
}
val message = pulsarReaderFactory
.createReader(Schema.STRING, listOf(readerBuilderCustomizer))
.readOne()
// ...
}
}
Spring Boot 自动配置提供了这个读取器工厂,可以通过设置任何带前缀的应用程序属性来自定义。spring.pulsar.reader.*
如果您需要对 reader 出厂配置进行更多控制,请考虑在使用工厂创建 reader 时传入一个或多个实例。ReactiveMessageReaderBuilderCustomizer
如果需要对 reader 出厂配置进行更多控制,请考虑注册一个或多个 bean。
这些定制器将应用于所有创建的读取器。
您还可以在创建读取器时传递一个或多个,以仅将自定义项应用于创建的读取器。ReactiveMessageReaderBuilderCustomizer
ReactiveMessageReaderBuilderCustomizer
有关上述任何组件的更多详细信息以及发现其他可用功能,请参阅 Spring for Apache Pulsar 参考文档。 |
4.10. Pulsar 的其他属性
自动配置支持的属性显示在附录的“集成属性”部分中。 请注意,在大多数情况下,这些属性(带 hyphened 或 camelCase)直接映射到 Apache Pulsar 配置属性。 有关详细信息,请参阅 Apache Pulsar 文档。
只有 Pulsar 支持的 property 子集可以直接通过 class 获得。
如果您希望使用不直接支持的其他属性来优化自动配置的组件,则可以使用上述每个组件支持的定制器。PulsarProperties
5. RS锁
RSocket 是一种用于字节流传输的二进制协议。 它通过单个连接上的异步消息传递来实现对称交互模型。
Spring Framework 的模块在客户端和服务器端为 RSocket 请求者和响应者提供支持。
有关更多详细信息,包括 RSocket 协议的概述,请参见 Spring Framework 参考的 RSocket 部分。spring-messaging
5.1. RSocket 策略自动配置
Spring Boot 自动配置一个 bean,该 bean 提供编码和解码 RSocket 有效负载所需的所有基础结构。
默认情况下,自动配置将尝试配置以下内容(按顺序):RSocketStrategies
-
使用 Jackson 的 CBOR 编解码器
-
使用 Jackson 的 JSON 编解码器
Starter 提供了这两个依赖项。
请参阅 Jackson 支持部分以了解有关自定义可能性的更多信息。spring-boot-starter-rsocket
开发人员可以通过创建实现该接口的 bean 来自定义组件。
请注意,这很重要,因为它决定了编解码器的顺序。RSocketStrategies
RSocketStrategiesCustomizer
@Order
5.2. RSocket 服务器自动配置
Spring Boot 提供 RSocket 服务器自动配置。
所需的依赖项由 .spring-boot-starter-rsocket
Spring Boot 允许从 WebFlux 服务器通过 WebSocket 公开 RSocket,或者建立独立的 RSocket 服务器。 这取决于应用程序的类型及其配置。
对于 WebFlux 应用程序(类型 ),仅当以下属性匹配时,RSocket 服务器才会插入 Web 服务器:WebApplicationType.REACTIVE
spring.rsocket.server.mapping-path=/rsocket
spring.rsocket.server.transport=websocket
spring:
rsocket:
server:
mapping-path: "/rsocket"
transport: "websocket"
只有 Reactor Netty 才支持将 RSocket 插入 Web 服务器,因为 RSocket 本身就是使用该库构建的。 |
或者,RSocket TCP 或 websocket 服务器作为独立的嵌入式服务器启动。 除了依赖项要求之外,唯一需要的配置是为该服务器定义一个端口:
spring.rsocket.server.port=9898
spring:
rsocket:
server:
port: 9898
5.3. Spring Messaging RSocket 支持
Spring Boot 将为 RSocket 自动配置 Spring Messaging 基础设施。
这意味着 Spring Boot 将创建一个 bean,该 bean 将处理对应用程序的 RSocket 请求。RSocketMessageHandler
5.4. 使用 RSocketRequester 调用 RSocket 服务
一旦在服务器和客户端之间建立了通道,任何一方都可以向另一方发送或接收请求。RSocket
作为服务器,您可以在 RSocket 的任何处理程序方法上注入实例。
作为客户端,您需要先配置并建立 RSocket 连接。
Spring Boot 使用预期的编解码器自动配置此类情况并应用任何 bean。RSocketRequester
@Controller
RSocketRequester.Builder
RSocketConnectorConfigurer
该实例是一个原型 bean,这意味着每个注入点都会为您提供一个新的实例。
这是有意为之的,因为此构建器是有状态的,您不应使用同一实例创建具有不同设置的请求者。RSocketRequester.Builder
以下代码显示了一个典型的示例:
@Service
public class MyService {
private final RSocketRequester rsocketRequester;
public MyService(RSocketRequester.Builder rsocketRequesterBuilder) {
this.rsocketRequester = rsocketRequesterBuilder.tcp("example.org", 9898);
}
public Mono<User> someRSocketCall(String name) {
return this.rsocketRequester.route("user").data(name).retrieveMono(User.class);
}
}
@Service
class MyService(rsocketRequesterBuilder: RSocketRequester.Builder) {
private val rsocketRequester: RSocketRequester
init {
rsocketRequester = rsocketRequesterBuilder.tcp("example.org", 9898)
}
fun someRSocketCall(name: String): Mono<User> {
return rsocketRequester.route("user").data(name).retrieveMono(
User::class.java
)
}
}
6. Spring 集成
Spring Boot 为使用 Spring Integration 提供了多种便利,包括“Starter”。
Spring 集成通过消息传递以及其他传输(如 HTTP、TCP 等)提供抽象。
如果 Spring 集成在你的 Classpath 上可用,则通过 Comments 对其进行初始化。spring-boot-starter-integration
@EnableIntegration
Spring 集成轮询逻辑依赖于自动配置的TaskScheduler
。
默认值(每秒轮询无限数量的消息)可以使用配置属性进行自定义。PollerMetadata
spring.integration.poller.*
Spring Boot 还配置了一些由其他 Spring 集成模块的存在触发的功能。
如果 也在 Classpath 上,则通过 JMX 发布消息处理统计信息。
如果可用,则可以在启动时创建默认数据库架构,如以下行所示:spring-integration-jmx
spring-integration-jdbc
spring.integration.jdbc.initialize-schema=always
spring:
integration:
jdbc:
initialize-schema: "always"
如果可用,开发人员可以使用属性配置 RSocket 服务器,并让它使用 or 组件来处理传入的 RSocket 消息。
这个基础设施可以处理 Spring 集成 RSocket 通道适配器和处理程序(给定已配置)。spring-integration-rsocket
"spring.rsocket.server.*"
IntegrationRSocketEndpoint
RSocketOutboundGateway
@MessageMapping
"spring.integration.rsocket.server.message-mapping-enabled"
Spring Boot 还可以使用配置属性自动配置:ClientRSocketConnector
# Connecting to a RSocket server over TCP
spring.integration.rsocket.client.host=example.org
spring.integration.rsocket.client.port=9898
# Connecting to a RSocket server over TCP
spring:
integration:
rsocket:
client:
host: "example.org"
port: 9898
# Connecting to a RSocket Server over WebSocket
spring.integration.rsocket.client.uri=ws://example.org
# Connecting to a RSocket Server over WebSocket
spring:
integration:
rsocket:
client:
uri: "ws://example.org"
有关更多详细信息,请参阅 IntegrationAutoConfiguration
和 IntegrationProperties
类。
7. 网络套接字
Spring Boot 为嵌入式 Tomcat、Jetty 和 Undertow 提供 WebSockets 自动配置。 如果将 war 文件部署到独立容器,则 Spring Boot 假定该容器负责其 WebSocket 支持的配置。
Spring Framework 为 MVC Web 应用程序提供了丰富的 WebSocket 支持,可以通过该模块轻松访问。spring-boot-starter-websocket
WebSocket 支持也可用于反应式 Web 应用程序,并且需要包括 WebSocket API 以及:spring-boot-starter-webflux
<dependency>
<groupId>jakarta.websocket</groupId>
<artifactId>jakarta.websocket-api</artifactId>
</dependency>