消息

1. JMS的

该接口提供了一种创建用于与 JMS 代理交互的标准方法。 尽管 Spring 需要与 JMS 一起使用,但您通常不需要自己直接使用它,而是可以依赖更高级别的消息传递抽象。 (有关详细信息,请参阅 Spring Framework 参考文档的相关部分。 Spring Boot 还自动配置了发送和接收消息所需的基础设施。javax.jms.ConnectionFactoryjavax.jms.ConnectionConnectionFactoryspring-doc.cn

1.1. ActiveMQ “Classic” 支持

ActiveMQ “Classic” 在 Classpath 上可用时, Spring Boot 还可以配置 . 如果存在代理,则会自动启动并配置嵌入式代理(前提是未通过配置指定代理 URL,并且未在配置中禁用嵌入式代理)。ConnectionFactoryspring-doc.cn

如果您使用 ,则提供了连接或嵌入 ActiveMQ “Classic” 实例所需的依赖项,以及与 JMS 集成的 Spring 基础架构。spring-boot-starter-activemq

ActiveMQ “Classic” 配置由 中的外部配置属性控制。spring.activemq.*spring-doc.cn

默认情况下,ActiveMQ “Classic” 自动配置为使用 VM 传输,该传输将启动嵌入在同一 JVM 实例中的代理。spring-doc.cn

您可以通过配置属性来禁用嵌入式代理,如以下示例所示:spring.activemq.in-memoryspring-doc.cn

性能
spring.activemq.in-memory=false
Yaml
spring:
  activemq:
    in-memory: false

如果您配置代理 URL,嵌入式代理也将被禁用,如以下示例所示:spring-doc.cn

性能
spring.activemq.broker-url=tcp://192.168.1.210:9876
spring.activemq.user=admin
spring.activemq.password=secret
Yaml
spring:
  activemq:
    broker-url: "tcp://192.168.1.210:9876"
    user: "admin"
    password: "secret"

如果您想完全控制嵌入式代理,请参阅 ActiveMQ“Classic”文档以了解更多信息。spring-doc.cn

默认情况下, a 使用合理的设置包装本机,您可以通过外部配置属性控制这些设置:CachingConnectionFactoryConnectionFactoryspring.jms.*spring-doc.cn

性能
spring.jms.cache.session-cache-size=5
Yaml
spring:
  jms:
    cache:
      session-cache-size: 5

如果您更愿意使用本机池,可以通过添加依赖项并相应地配置来实现,如以下示例所示:org.messaginghub:pooled-jmsJmsPoolConnectionFactoryspring-doc.cn

性能
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=50
Yaml
spring:
  activemq:
    pool:
      enabled: true
      max-connections: 50
有关更多支持的选项,请参阅 ActiveMQProperties 。 您还可以注册任意数量的 bean 来实现更高级的自定义。ActiveMQConnectionFactoryCustomizer

默认情况下,ActiveMQ “Classic” 会创建一个目标(如果尚不存在),以便根据提供的名称解析目标。spring-doc.cn

1.2. ActiveMQ Artemis 支持

Spring Boot 可以在检测到 ActiveMQ Artemis 在 Classpath 上可用时自动配置。 如果存在代理,则会自动启动并配置嵌入式代理(除非已明确设置 mode 属性)。 支持的模式是(明确表示需要嵌入式代理,如果代理在 Classpath 上不可用,则应该发生错误)和(使用传输协议连接到代理)。 配置后者后, Spring Boot 会配置一个,该代理使用默认设置连接到本地计算机上运行的代理。ConnectionFactoryembeddednativenettyConnectionFactoryspring-doc.cn

如果您使用 ,则提供了连接到现有 ActiveMQ Artemis 实例所需的依赖项,以及与 JMS 集成的 Spring 基础架构。 通过添加到应用程序,可以使用嵌入式模式。spring-boot-starter-artemisorg.apache.activemq:artemis-jms-server

ActiveMQ Artemis 配置由 中的外部配置属性控制。 例如,您可以在 :spring.artemis.*application.propertiesspring-doc.cn

性能
spring.artemis.mode=native
spring.artemis.broker-url=tcp://192.168.1.210:9876
spring.artemis.user=admin
spring.artemis.password=secret
Yaml
spring:
  artemis:
    mode: native
    broker-url: "tcp://192.168.1.210:9876"
    user: "admin"
    password: "secret"

在嵌入代理时,您可以选择是否要启用持久性并列出应可用的目标。 这些可以指定为逗号分隔的列表,以便使用默认选项创建它们,或者您可以分别为高级队列和主题配置定义 或 类型的 bean。org.apache.activemq.artemis.jms.server.config.JMSQueueConfigurationorg.apache.activemq.artemis.jms.server.config.TopicConfigurationspring-doc.cn

默认情况下, a 使用合理的设置包装本机,您可以通过外部配置属性控制这些设置:CachingConnectionFactoryConnectionFactoryspring.jms.*spring-doc.cn

性能
spring.jms.cache.session-cache-size=5
Yaml
spring:
  jms:
    cache:
      session-cache-size: 5

如果您更愿意使用本机池,可以通过添加依赖项并相应地配置来实现,如以下示例所示:org.messaginghub:pooled-jmsJmsPoolConnectionFactoryspring-doc.cn

性能
spring.artemis.pool.enabled=true
spring.artemis.pool.max-connections=50
Yaml
spring:
  artemis:
    pool:
      enabled: true
      max-connections: 50

有关更多支持的选项,请参阅 ArtemisPropertiesspring-doc.cn

不涉及 JNDI 查找,并且使用 ActiveMQ Artemis 配置中的属性或通过配置提供的名称根据其名称解析目标。namespring-doc.cn

1.3. 使用 JNDI ConnectionFactory

如果您在应用程序服务器中运行应用程序,则 Spring Boot 会尝试使用 JNDI 查找 JMS。 默认情况下,将选中 和 location。 如果需要指定备用位置,则可以使用该属性,如以下示例所示:ConnectionFactoryjava:/JmsXAjava:/XAConnectionFactoryspring.jms.jndi-namespring-doc.cn

性能
spring.jms.jndi-name=java:/MyConnectionFactory
Yaml
spring:
  jms:
    jndi-name: "java:/MyConnectionFactory"

1.4. 发送消息

Spring 的 bean 是自动配置的,你可以将其直接自动连接到你自己的 bean 中,如以下示例所示:JmsTemplatespring-doc.cn

Java
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    private final JmsTemplate jmsTemplate;

    public MyBean(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }

    // ...

    public void someMethod() {
        this.jmsTemplate.convertAndSend("hello");
    }

}
Kotlin
import org.springframework.jms.core.JmsTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val jmsTemplate: JmsTemplate) {

    // ...

    fun someMethod() {
        jmsTemplate.convertAndSend("hello")
    }

}
JmsMessagingTemplate可以以类似的方式注入。 如果定义了 a 或 a bean,则它会自动关联到 auto-configured 的 。DestinationResolverMessageConverterJmsTemplate

1.5. 接收消息

当 JMS 基础结构存在时,可以使用任何 bean 进行注释以创建侦听器端点。 如果未定义 no,则会自动配置默认 1 个。 如果定义了 a 、 a 或 a bean,则它们将自动与默认工厂关联。@JmsListenerJmsListenerContainerFactoryDestinationResolverMessageConverterjavax.jms.ExceptionListenerspring-doc.cn

默认情况下,默认工厂是事务性的。 如果您在存在 a 的基础设施中运行,则默认情况下它与侦听器容器相关联。 否则,将启用该标志。 在后一种情况下,您可以通过添加侦听器方法(或其委托)将本地数据存储事务与传入消息的处理相关联。 这可确保在本地事务完成后确认传入消息。 这还包括发送已在同一 JMS 会话上执行的响应消息。JtaTransactionManagersessionTransacted@Transactionalspring-doc.cn

以下组件在目标上创建一个侦听器终端节点:someQueuespring-doc.cn

Java
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    @JmsListener(destination = "someQueue")
    public void processMessage(String content) {
        // ...
    }

}
Kotlin
import org.springframework.jms.annotation.JmsListener
import org.springframework.stereotype.Component

@Component
class MyBean {

    @JmsListener(destination = "someQueue")
    fun processMessage(content: String?) {
        // ...
    }

}
有关更多详细信息,请参阅 @EnableJms 的 Javadoc

如果你需要创建更多实例,或者想要覆盖默认值,Spring Boot 提供了一个,你可以使用它来初始化一个,其设置与自动配置的设置相同。JmsListenerContainerFactoryDefaultJmsListenerContainerFactoryConfigurerDefaultJmsListenerContainerFactoryspring-doc.cn

例如,以下示例公开了另一个使用特定 :MessageConverterspring-doc.cn

Java
import javax.jms.ConnectionFactory;

import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;

@Configuration(proxyBeanMethods = false)
public class MyJmsConfiguration {

    @Bean
    public DefaultJmsListenerContainerFactory myFactory(DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        ConnectionFactory connectionFactory = getCustomConnectionFactory();
        configurer.configure(factory, connectionFactory);
        factory.setMessageConverter(new MyMessageConverter());
        return factory;
    }

    private ConnectionFactory getCustomConnectionFactory() {
        return ...
    }

}
Kotlin
import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.jms.config.DefaultJmsListenerContainerFactory
import javax.jms.ConnectionFactory

@Configuration(proxyBeanMethods = false)
class MyJmsConfiguration {

    @Bean
    fun myFactory(configurer: DefaultJmsListenerContainerFactoryConfigurer): DefaultJmsListenerContainerFactory {
        val factory = DefaultJmsListenerContainerFactory()
        val connectionFactory = getCustomConnectionFactory()
        configurer.configure(factory, connectionFactory)
        factory.setMessageConverter(MyMessageConverter())
        return factory
    }

    fun getCustomConnectionFactory() : ConnectionFactory? {
        return ...
    }

}

然后,您可以在任何带 -annotated 的方法中使用工厂,如下所示:@JmsListenerspring-doc.cn

Java
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    @JmsListener(destination = "someQueue", containerFactory = "myFactory")
    public void processMessage(String content) {
        // ...
    }

}
Kotlin
import org.springframework.jms.annotation.JmsListener
import org.springframework.stereotype.Component

@Component
class MyBean {

    @JmsListener(destination = "someQueue", containerFactory = "myFactory")
    fun processMessage(content: String?) {
        // ...
    }

}

2. AMQP

高级消息队列协议 (AMQP) 是一种平台中立的有线级协议,适用于面向消息的中间件。 Spring AMQP 项目将核心 Spring 概念应用于基于 AMQP 的消息传递解决方案的开发。 Spring Boot 为通过 RabbitMQ 使用 AMQP 提供了多种便利,包括“Starter”。spring-boot-starter-amqpspring-doc.cn

2.1. RabbitMQ 支持

RabbitMQ 是基于 AMQP 协议的轻量级、可靠、可扩展且可移植的消息代理。 Spring 用于通过 AMQP 协议进行通信。RabbitMQspring-doc.cn

RabbitMQ 配置由 中的外部配置属性控制。 例如,您可以在 :spring.rabbitmq.*application.propertiesspring-doc.cn

性能
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=secret
Yaml
spring:
  rabbitmq:
    host: "localhost"
    port: 5672
    username: "admin"
    password: "secret"

或者,您可以使用以下属性配置相同的连接:addressesspring-doc.cn

性能
spring.rabbitmq.addresses=amqp://admin:secret@localhost
Yaml
spring:
  rabbitmq:
    addresses: "amqp://admin:secret@localhost"
以这种方式指定地址时,将忽略 and 属性。 如果地址使用该协议,则会自动启用 SSL 支持。hostportamqps

有关更多受支持的基于属性的配置选项,请参见 RabbitProperties。 要配置 Spring AMQP 使用的 RabbitMQ 的较低级别详细信息,请定义一个 bean。ConnectionFactoryConnectionFactoryCustomizerspring-doc.cn

如果上下文中存在 bean,它将自动用于命名由自动配置的 .ConnectionNameStrategyCachingConnectionFactoryspring-doc.cn

有关更多详细信息,请参阅了解 RabbitMQ 使用的协议 AMQP

2.2. 发送消息

Spring 的 and 是自动配置的,你可以将它们直接自动连接到你自己的 bean 中,如以下示例所示:AmqpTemplateAmqpAdminspring-doc.cn

Java
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    private final AmqpAdmin amqpAdmin;

    private final AmqpTemplate amqpTemplate;

    public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
        this.amqpAdmin = amqpAdmin;
        this.amqpTemplate = amqpTemplate;
    }

    // ...

    public void someMethod() {
        this.amqpAdmin.getQueueInfo("someQueue");
    }

    public void someOtherMethod() {
        this.amqpTemplate.convertAndSend("hello");
    }

}
Kotlin
import org.springframework.amqp.core.AmqpAdmin
import org.springframework.amqp.core.AmqpTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val amqpAdmin: AmqpAdmin, private val amqpTemplate: AmqpTemplate) {

    // ...

    fun someMethod() {
        amqpAdmin.getQueueInfo("someQueue")
    }

    fun someOtherMethod() {
        amqpTemplate.convertAndSend("hello")
    }

}
RabbitMessagingTemplate可以以类似的方式注入。 如果定义了 bean,则它会自动关联到 auto-configured .MessageConverterAmqpTemplate

如有必要,将自动使用定义为 Bean 的任何 Bean 在 RabbitMQ 实例上声明相应的队列。org.springframework.amqp.core.Queuespring-doc.cn

要重试操作,您可以在 上启用重试(例如,在代理连接丢失的情况下):AmqpTemplatespring-doc.cn

性能
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=2s
Yaml
spring:
  rabbitmq:
    template:
      retry:
        enabled: true
        initial-interval: "2s"

默认情况下,重试处于禁用状态。 您还可以通过声明 bean 以编程方式自定义 bean。RetryTemplateRabbitRetryTemplateCustomizerspring-doc.cn

如果你需要创建更多实例,或者想要覆盖默认值,Spring Boot 提供了一个 bean,你可以使用它来初始化一个,其设置与自动配置使用的工厂相同。RabbitTemplateRabbitTemplateConfigurerRabbitTemplatespring-doc.cn

2.3. 向流发送消息

要向特定流发送消息,请指定流的名称,如以下示例所示:spring-doc.cn

性能
spring.rabbitmq.stream.name=my-stream
Yaml
spring:
  rabbitmq:
    stream:
      name: "my-stream"

如果定义了 , , 或 bean,则它会自动与自动配置的 .MessageConverterStreamMessageConverterProducerCustomizerRabbitStreamTemplatespring-doc.cn

如果你需要创建更多实例,或者想要覆盖默认值,Spring Boot 提供了一个 bean,你可以使用它来初始化一个,其设置与自动配置使用的工厂相同。RabbitStreamTemplateRabbitStreamTemplateConfigurerRabbitStreamTemplatespring-doc.cn

2.4. 接收消息

当 Rabbit 基础结构存在时,可以使用任何 bean 进行 Comments 以创建侦听器端点。 如果未定义 no,则会自动配置 default,您可以使用该属性切换到直接容器。 如果定义了 a 或 a bean,则它将自动与默认工厂相关联。@RabbitListenerRabbitListenerContainerFactorySimpleRabbitListenerContainerFactoryspring.rabbitmq.listener.typeMessageConverterMessageRecovererspring-doc.cn

以下示例组件在队列上创建一个侦听器终端节点:someQueuespring-doc.cn

Java
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    @RabbitListener(queues = "someQueue")
    public void processMessage(String content) {
        // ...
    }

}
Kotlin
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component

@Component
class MyBean {

    @RabbitListener(queues = ["someQueue"])
    fun processMessage(content: String?) {
        // ...
    }

}
有关更多详细信息,请参阅 @EnableRabbit 的 Javadoc

如果你需要创建更多实例,或者想要覆盖默认值,Spring Boot 提供了a和a,你可以使用它来初始化a和a,其设置与自动配置使用的工厂相同。RabbitListenerContainerFactorySimpleRabbitListenerContainerFactoryConfigurerDirectRabbitListenerContainerFactoryConfigurerSimpleRabbitListenerContainerFactoryDirectRabbitListenerContainerFactoryspring-doc.cn

选择哪种容器类型并不重要。 这两个 bean 由自动配置公开。

例如,下面的配置类公开了另一个使用特定 :MessageConverterspring-doc.cn

Java
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration(proxyBeanMethods = false)
public class MyRabbitConfiguration {

    @Bean
    public SimpleRabbitListenerContainerFactory myFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        ConnectionFactory connectionFactory = getCustomConnectionFactory();
        configurer.configure(factory, connectionFactory);
        factory.setMessageConverter(new MyMessageConverter());
        return factory;
    }

    private ConnectionFactory getCustomConnectionFactory() {
        return ...
    }

}
Kotlin
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory
import org.springframework.amqp.rabbit.connection.ConnectionFactory
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

@Configuration(proxyBeanMethods = false)
class MyRabbitConfiguration {

    @Bean
    fun myFactory(configurer: SimpleRabbitListenerContainerFactoryConfigurer): SimpleRabbitListenerContainerFactory {
        val factory = SimpleRabbitListenerContainerFactory()
        val connectionFactory = getCustomConnectionFactory()
        configurer.configure(factory, connectionFactory)
        factory.setMessageConverter(MyMessageConverter())
        return factory
    }

    fun getCustomConnectionFactory() : ConnectionFactory? {
        return ...
    }

}

然后,您可以在任何带 -annotated 的方法中使用工厂,如下所示:@RabbitListenerspring-doc.cn

Java
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    @RabbitListener(queues = "someQueue", containerFactory = "myFactory")
    public void processMessage(String content) {
        // ...
    }

}
Kotlin
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component

@Component
class MyBean {

    @RabbitListener(queues = ["someQueue"], containerFactory = "myFactory")
    fun processMessage(content: String?) {
        // ...
    }

}

您可以启用重试来处理侦听器引发异常的情况。 默认情况下,使用,但您可以定义自己的 a。 当重试次数用尽时,消息将被拒绝,并被丢弃或路由到死信交换(如果代理配置为这样做)。 默认情况下,重试处于禁用状态。 您还可以通过声明 bean 以编程方式自定义 bean。RejectAndDontRequeueRecovererMessageRecovererRetryTemplateRabbitRetryTemplateCustomizerspring-doc.cn

默认情况下,如果禁用重试并且侦听器引发异常,则会无限期地重试投放。 您可以通过两种方式修改此行为:将属性设置为 以便尝试零重新投放,或抛出 an 以指示应拒绝消息。 后者是启用重试并达到最大投放尝试次数时使用的机制。defaultRequeueRejectedfalseAmqpRejectAndDontRequeueException

3. Apache Kafka 支持

通过提供项目的自动配置来支持 Apache Kafkaspring-kafkaspring-doc.cn

Kafka 配置由 中的外部配置属性控制。 例如,您可以在 :spring.kafka.*application.propertiesspring-doc.cn

性能
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
Yaml
spring:
  kafka:
    bootstrap-servers: "localhost:9092"
    consumer:
      group-id: "myGroup"
要在启动时创建主题,请添加 类型的 bean 。 如果主题已存在,则忽略该 Bean。NewTopic

有关更多支持的选项,请参阅 KafkaPropertiesspring-doc.cn

3.1. 发送消息

Spring 的 bean 是自动配置的,你可以直接在自己的 bean 中自动装配它,如以下示例所示:KafkaTemplatespring-doc.cn

Java
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public MyBean(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    // ...

    public void someMethod() {
        this.kafkaTemplate.send("someTopic", "Hello");
    }

}
Kotlin
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val kafkaTemplate: KafkaTemplate<String, String>) {

    // ...

    fun someMethod() {
        kafkaTemplate.send("someTopic", "Hello")
    }

}
如果定义了属性,则会自动配置 a。 此外,如果定义了 bean,则它会自动关联到 auto-configured 的 .spring.kafka.producer.transaction-id-prefixKafkaTransactionManagerRecordMessageConverterKafkaTemplate

3.2. 接收消息

当存在 Apache Kafka 基础结构时,可以使用任何 bean 进行注释以创建侦听器终端节点。 如果未定义 no,则会自动使用 中定义的键配置默认 。@KafkaListenerKafkaListenerContainerFactoryspring.kafka.listener.*spring-doc.cn

以下组件在主题上创建侦听器终端节点:someTopicspring-doc.cn

Java
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    @KafkaListener(topics = "someTopic")
    public void processMessage(String content) {
        // ...
    }

}
Kotlin
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.stereotype.Component

@Component
class MyBean {

    @KafkaListener(topics = ["someTopic"])
    fun processMessage(content: String?) {
        // ...
    }

}

如果定义了 Bean,则它会自动与容器工厂关联。 同样,如果定义了 , , 或 bean,则它会自动与默认工厂关联。KafkaTransactionManagerRecordFilterStrategyCommonErrorHandlerAfterRollbackProcessorConsumerAwareRebalanceListenerspring-doc.cn

根据侦听器类型,a 或 bean 与默认工厂相关联。 如果批处理侦听器仅存在 bean,则将其包装在 .RecordMessageConverterBatchMessageConverterRecordMessageConverterBatchMessageConverterspring-doc.cn

必须标记自定义,因为它通常引用自动配置的 bean。ChainedKafkaTransactionManager@PrimaryKafkaTransactionManager

3.3. Kafka 流

Spring for Apache Kafka 提供了一个工厂 Bean 来创建对象并管理其流的生命周期。 Spring Boot 只要在 Classpath 上,并且 Kafka Streams 由 Comments 启用,它就会自动配置所需的 bean。StreamsBuilderKafkaStreamsConfigurationkafka-streams@EnableKafkaStreamsspring-doc.cn

启用 Kafka Streams 意味着必须设置应用程序 ID 和引导服务器。 前者可以使用 ,如果未设置,则默认为 。 后者可以全局设置,也可以仅针对流专门覆盖。spring.kafka.streams.application-idspring.application.namespring-doc.cn

使用专用属性可以使用多个其他属性;可以使用命名空间设置其他任意 Kafka 属性。 有关更多信息,另请参阅其他 Kafka 属性spring.kafka.streams.propertiesspring-doc.cn

要使用工厂 Bean,请连接到 u,如以下示例所示:StreamsBuilder@Beanspring-doc.cn

Java
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.support.serializer.JsonSerde;

@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
public class MyKafkaStreamsConfiguration {

    @Bean
    public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
        KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
        stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), new JsonSerde<>()));
        return stream;
    }

    private KeyValue<Integer, String> uppercaseValue(Integer key, String value) {
        return new KeyValue<>(key, value.toUpperCase());
    }

}
Kotlin
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Produced
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.annotation.EnableKafkaStreams
import org.springframework.kafka.support.serializer.JsonSerde

@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
class MyKafkaStreamsConfiguration {

    @Bean
    fun kStream(streamsBuilder: StreamsBuilder): KStream<Int, String> {
        val stream = streamsBuilder.stream<Int, String>("ks1In")
        stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), JsonSerde()))
        return stream
    }

    private fun uppercaseValue(key: Int, value: String): KeyValue<Int?, String?> {
        return KeyValue(key, value.uppercase())
    }

}

默认情况下,对象管理的流会自动启动。 您可以使用 属性 自定义此行为。StreamBuilderspring.kafka.streams.auto-startupspring-doc.cn

3.4. 其他 Kafka 属性

自动配置支持的属性显示在附录的 “集成属性” 部分中。 请注意,在大多数情况下,这些属性(带连字符或 camelCase)直接映射到 Apache Kafka 点分隔属性。 有关详细信息,请参阅 Apache Kafka 文档。spring-doc.cn

名称中不包含客户端类型 (、 、 或 ) 的属性被视为通用属性,并适用于所有客户端。 如果需要,可以覆盖一个或多个客户端类型的大多数公共属性。producerconsumeradminstreamsspring-doc.cn

Apache Kafka 指定重要性为 HIGH、MEDIUM 或 LOW 的属性。 Spring Boot 自动配置支持所有 HIGH 重要性属性,一些选定的 MEDIUM 和 LOW 属性,以及任何没有默认值的属性。spring-doc.cn

只有 Kafka 支持的属性子集可以直接通过该类使用。 如果要使用不直接支持的其他属性配置各个客户端类型,请使用以下属性:KafkaPropertiesspring-doc.cn

性能
spring.kafka.properties[prop.one]=first
spring.kafka.admin.properties[prop.two]=second
spring.kafka.consumer.properties[prop.three]=third
spring.kafka.producer.properties[prop.four]=fourth
spring.kafka.streams.properties[prop.five]=fifth
Yaml
spring:
  kafka:
    properties:
      "[prop.one]": "first"
    admin:
      properties:
        "[prop.two]": "second"
    consumer:
      properties:
        "[prop.three]": "third"
    producer:
      properties:
        "[prop.four]": "fourth"
    streams:
      properties:
        "[prop.five]": "fifth"

这会将公共 Kafka 属性设置为(适用于创建者、使用者、管理员和流),将 admin 属性设置为,将使用者属性设置为,将 producer 属性设置为,将 streams 属性设置为。prop.onefirstprop.twosecondprop.threethirdprop.fourfourthprop.fivefifthspring-doc.cn

您还可以按如下方式配置 Spring Kafka:JsonDeserializerspring-doc.cn

性能
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties[spring.json.value.default.type]=com.example.Invoice
spring.kafka.consumer.properties[spring.json.trusted.packages]=com.example.main,com.example.another
Yaml
spring:
  kafka:
    consumer:
      value-deserializer: "org.springframework.kafka.support.serializer.JsonDeserializer"
      properties:
        "[spring.json.value.default.type]": "com.example.Invoice"
        "[spring.json.trusted.packages]": "com.example.main,com.example.another"

同样,您可以禁用在 Headers 中发送类型信息的默认行为:JsonSerializerspring-doc.cn

性能
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties[spring.json.add.type.headers]=false
Yaml
spring:
  kafka:
    producer:
      value-serializer: "org.springframework.kafka.support.serializer.JsonSerializer"
      properties:
        "[spring.json.add.type.headers]": false
以这种方式设置的属性将覆盖 Spring Boot 显式支持的任何配置项。

3.5. 使用嵌入式 Kafka 进行测试

Spring for Apache Kafka 提供了一种使用嵌入式 Apache Kafka 代理测试项目的便捷方法。 要使用此功能,请使用 from the module 注释测试类。 有关更多信息,请参阅 Spring for Apache Kafka 参考手册@EmbeddedKafkaspring-kafka-testspring-doc.cn

要使 Spring Boot 自动配置与上述嵌入式 Apache Kafka 代理一起使用,您需要将嵌入式代理地址的系统属性(由 填充 )重新映射到 Apache Kafka 的 Spring Boot 配置属性中。 有几种方法可以做到这一点:EmbeddedKafkaBrokerspring-doc.cn

  • 在测试类中提供一个 system 属性,用于将嵌入式代理地址映射到其中:spring.kafka.bootstrap-serversspring-doc.cn

Java
static {
    System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
}
Kotlin
init {
    System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers")
}
Java
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.test.context.EmbeddedKafka;

@SpringBootTest
@EmbeddedKafka(topics = "someTopic", bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {

    // ...

}
Kotlin
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.kafka.test.context.EmbeddedKafka

@SpringBootTest
@EmbeddedKafka(topics = ["someTopic"], bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {

    // ...

}
性能
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
Yaml
spring:
  kafka:
    bootstrap-servers: "${spring.embedded.kafka.brokers}"

4. RS锁

RSocket 是一种用于字节流传输的二进制协议。 它通过单个连接上的异步消息传递来实现对称交互模型。spring-doc.cn

Spring Framework 的模块在客户端和服务器端为 RSocket 请求者和响应者提供支持。 有关更多详细信息,包括 RSocket 协议的概述,请参见 Spring Framework 参考的 RSocket 部分spring-messagingspring-doc.cn

4.1. RSocket 策略自动配置

Spring Boot 自动配置一个 bean,该 bean 提供编码和解码 RSocket 有效负载所需的所有基础结构。 默认情况下,自动配置将尝试配置以下内容(按顺序):RSocketStrategiesspring-doc.cn

  1. 使用 Jackson 的 CBOR 编解码器spring-doc.cn

  2. 使用 Jackson 的 JSON 编解码器spring-doc.cn

Starter 提供了这两个依赖项。 请参阅 Jackson 支持部分以了解有关自定义可能性的更多信息。spring-boot-starter-rsocketspring-doc.cn

开发人员可以通过创建实现该接口的 bean 来自定义组件。 请注意,这很重要,因为它决定了编解码器的顺序。RSocketStrategiesRSocketStrategiesCustomizer@Orderspring-doc.cn

4.2. RSocket 服务器自动配置

Spring Boot 提供 RSocket 服务器自动配置。 所需的依赖项由 .spring-boot-starter-rsocketspring-doc.cn

Spring Boot 允许从 WebFlux 服务器通过 WebSocket 公开 RSocket,或者建立独立的 RSocket 服务器。 这取决于应用程序的类型及其配置。spring-doc.cn

对于 WebFlux 应用程序(类型 ),仅当以下属性匹配时,RSocket 服务器才会插入 Web 服务器:WebApplicationType.REACTIVEspring-doc.cn

性能
spring.rsocket.server.mapping-path=/rsocket
spring.rsocket.server.transport=websocket
Yaml
spring:
  rsocket:
    server:
      mapping-path: "/rsocket"
      transport: "websocket"
只有 Reactor Netty 才支持将 RSocket 插入 Web 服务器,因为 RSocket 本身就是使用该库构建的。

或者,RSocket TCP 或 websocket 服务器作为独立的嵌入式服务器启动。 除了依赖项要求之外,唯一需要的配置是为该服务器定义一个端口:spring-doc.cn

性能
spring.rsocket.server.port=9898
Yaml
spring:
  rsocket:
    server:
      port: 9898

4.3. Spring Messaging RSocket 支持

Spring Boot 将为 RSocket 自动配置 Spring Messaging 基础设施。spring-doc.cn

这意味着 Spring Boot 将创建一个 bean,该 bean 将处理对应用程序的 RSocket 请求。RSocketMessageHandlerspring-doc.cn

4.4. 使用 RSocketRequester 调用 RSocket 服务

一旦在服务器和客户端之间建立了通道,任何一方都可以向另一方发送或接收请求。RSocketspring-doc.cn

作为服务器,您可以在 RSocket 的任何处理程序方法上注入实例。 作为客户端,您需要先配置并建立 RSocket 连接。 Spring Boot 使用预期的编解码器自动配置此类情况并应用任何 bean。RSocketRequester@ControllerRSocketRequester.BuilderRSocketConnectorConfigurerspring-doc.cn

该实例是一个原型 bean,这意味着每个注入点都会为您提供一个新的实例。 这是有意为之的,因为此构建器是有状态的,您不应使用同一实例创建具有不同设置的请求者。RSocketRequester.Builderspring-doc.cn

以下代码显示了一个典型的示例:spring-doc.cn

Java
import reactor.core.publisher.Mono;

import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.stereotype.Service;

@Service
public class MyService {

    private final RSocketRequester rsocketRequester;

    public MyService(RSocketRequester.Builder rsocketRequesterBuilder) {
        this.rsocketRequester = rsocketRequesterBuilder.tcp("example.org", 9898);
    }

    public Mono<User> someRSocketCall(String name) {
        return this.rsocketRequester.route("user").data(name).retrieveMono(User.class);
    }

}
Kotlin
import org.springframework.messaging.rsocket.RSocketRequester
import org.springframework.stereotype.Service
import reactor.core.publisher.Mono

@Service
class MyService(rsocketRequesterBuilder: RSocketRequester.Builder) {

    private val rsocketRequester: RSocketRequester

    init {
        rsocketRequester = rsocketRequesterBuilder.tcp("example.org", 9898)
    }

    fun someRSocketCall(name: String): Mono<User> {
        return rsocketRequester.route("user").data(name).retrieveMono(
            User::class.java
        )
    }

}

5. Spring 集成

Spring Boot 为使用 Spring Integration 提供了多种便利,包括“Starter”。 Spring 集成通过消息传递以及其他传输(如 HTTP、TCP 等)提供抽象。 如果 Spring 集成在你的 Classpath 上可用,则通过 Comments 对其进行初始化。spring-boot-starter-integration@EnableIntegrationspring-doc.cn

Spring 集成轮询逻辑依赖于自动配置的TaskScheduler。 默认值(每秒轮询无限数量的消息)可以使用配置属性进行自定义。PollerMetadataspring.integration.poller.*spring-doc.cn

Spring Boot 还配置了一些由其他 Spring 集成模块的存在触发的功能。 如果 也在 Classpath 上,则通过 JMX 发布消息处理统计信息。 如果可用,则可以在启动时创建默认数据库架构,如以下行所示:spring-integration-jmxspring-integration-jdbcspring-doc.cn

性能
spring.integration.jdbc.initialize-schema=always
Yaml
spring:
  integration:
    jdbc:
      initialize-schema: "always"

如果可用,开发人员可以使用属性配置 RSocket 服务器,并让它使用 or 组件来处理传入的 RSocket 消息。 这个基础设施可以处理 Spring 集成 RSocket 通道适配器和处理程序(给定已配置)。spring-integration-rsocket"spring.rsocket.server.*"IntegrationRSocketEndpointRSocketOutboundGateway@MessageMapping"spring.integration.rsocket.server.message-mapping-enabled"spring-doc.cn

Spring Boot 还可以使用配置属性自动配置:ClientRSocketConnectorspring-doc.cn

性能
# Connecting to a RSocket server over TCP
spring.integration.rsocket.client.host=example.org
spring.integration.rsocket.client.port=9898
Yaml
# Connecting to a RSocket server over TCP
spring:
  integration:
    rsocket:
      client:
        host: "example.org"
        port: 9898
性能
# Connecting to a RSocket Server over WebSocket
spring.integration.rsocket.client.uri=ws://example.org
Yaml
# Connecting to a RSocket Server over WebSocket
spring:
  integration:
    rsocket:
      client:
        uri: "ws://example.org"

有关更多详细信息,请参阅 IntegrationAutoConfigurationIntegrationProperties 类。spring-doc.cn

6. 网络套接字

Spring Boot 为嵌入式 Tomcat、Jetty 和 Undertow 提供 WebSockets 自动配置。 如果将 war 文件部署到独立容器,则 Spring Boot 假定该容器负责其 WebSocket 支持的配置。spring-doc.cn

Spring Framework 为 MVC Web 应用程序提供了丰富的 WebSocket 支持,可以通过该模块轻松访问。spring-boot-starter-websocketspring-doc.cn

WebSocket 支持也可用于反应式 Web 应用程序,并且需要包括 WebSocket API 以及:spring-boot-starter-webfluxspring-doc.cn

<dependency>
    <groupId>javax.websocket</groupId>
    <artifactId>javax.websocket-api</artifactId>
</dependency>

7. 下一步要读什么

下一节介绍如何在应用程序中启用 IO 功能。 您可以在本节中阅读有关缓存邮件验证REST 客户端等的信息。spring-doc.cn