此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Boot 3.3.4spring-doc.cn

此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Boot 3.3.4spring-doc.cn

通过提供项目的自动配置来支持 Apache Kafkaspring-kafkaspring-doc.cn

Kafka 配置由 中的外部配置属性控制。 例如,您可以在 :spring.kafka.*application.propertiesspring-doc.cn

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
spring:
  kafka:
    bootstrap-servers: "localhost:9092"
    consumer:
      group-id: "myGroup"
要在启动时创建主题,请添加 类型的 bean 。 如果主题已存在,则忽略该 Bean。NewTopic

有关更多支持的选项,请参阅。KafkaPropertiesspring-doc.cn

要在启动时创建主题,请添加 类型的 bean 。 如果主题已存在,则忽略该 Bean。NewTopic

发送消息

Spring 的 bean 是自动配置的,你可以直接在自己的 bean 中自动装配它,如以下示例所示:KafkaTemplatespring-doc.cn

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final KafkaTemplate<String, String> kafkaTemplate;

	public MyBean(KafkaTemplate<String, String> kafkaTemplate) {
		this.kafkaTemplate = kafkaTemplate;
	}

	// ...

	public void someMethod() {
		this.kafkaTemplate.send("someTopic", "Hello");
	}

}
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val kafkaTemplate: KafkaTemplate<String, String>) {

	// ...

	fun someMethod() {
		kafkaTemplate.send("someTopic", "Hello")
	}

}
如果定义了属性,则会自动配置 a。 此外,如果定义了 bean,则它会自动关联到 auto-configured 的 .spring.kafka.producer.transaction-id-prefixKafkaTransactionManagerRecordMessageConverterKafkaTemplate
如果定义了属性,则会自动配置 a。 此外,如果定义了 bean,则它会自动关联到 auto-configured 的 .spring.kafka.producer.transaction-id-prefixKafkaTransactionManagerRecordMessageConverterKafkaTemplate

接收消息

当存在 Apache Kafka 基础结构时,可以使用任何 bean 进行注释以创建侦听器终端节点。 如果未定义 no,则会自动使用 中定义的键配置默认 。@KafkaListenerKafkaListenerContainerFactoryspring.kafka.listener.*spring-doc.cn

以下组件在主题上创建侦听器终端节点:someTopicspring-doc.cn

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@KafkaListener(topics = "someTopic")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.stereotype.Component

@Component
class MyBean {

	@KafkaListener(topics = ["someTopic"])
	fun processMessage(content: String?) {
		// ...
	}

}

如果定义了 Bean,则它会自动与容器工厂关联。 同样,如果定义了 , , 或 bean,则它会自动与默认工厂关联。KafkaTransactionManagerRecordFilterStrategyCommonErrorHandlerAfterRollbackProcessorConsumerAwareRebalanceListenerspring-doc.cn

根据侦听器类型,a 或 bean 与默认工厂相关联。 如果批处理侦听器仅存在 bean,则将其包装在 .RecordMessageConverterBatchMessageConverterRecordMessageConverterBatchMessageConverterspring-doc.cn

必须标记自定义,因为它通常引用自动配置的 bean。ChainedKafkaTransactionManager@PrimaryKafkaTransactionManager
必须标记自定义,因为它通常引用自动配置的 bean。ChainedKafkaTransactionManager@PrimaryKafkaTransactionManager

Kafka 流

Spring for Apache Kafka 提供了一个工厂 Bean 来创建对象并管理其流的生命周期。 Spring Boot 只要在 Classpath 上,并且 Kafka Streams 由 Comments 启用,它就会自动配置所需的 bean。StreamsBuilderKafkaStreamsConfigurationkafka-streams@EnableKafkaStreamsspring-doc.cn

启用 Kafka Streams 意味着必须设置应用程序 ID 和引导服务器。 前者可以使用 ,如果未设置,则默认为 。 后者可以全局设置,也可以仅针对流专门覆盖。spring.kafka.streams.application-idspring.application.namespring-doc.cn

使用专用属性可以使用多个其他属性;可以使用命名空间设置其他任意 Kafka 属性。 有关更多信息,另请参阅其他 Kafka 属性spring.kafka.streams.propertiesspring-doc.cn

要使用工厂 Bean,请连接到 u,如以下示例所示:StreamsBuilder@Beanspring-doc.cn

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.support.serializer.JsonSerde;

@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
public class MyKafkaStreamsConfiguration {

	@Bean
	public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
		KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
		stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), new JsonSerde<>()));
		return stream;
	}

	private KeyValue<Integer, String> uppercaseValue(Integer key, String value) {
		return new KeyValue<>(key, value.toUpperCase());
	}

}
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Produced
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.annotation.EnableKafkaStreams
import org.springframework.kafka.support.serializer.JsonSerde

@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
class MyKafkaStreamsConfiguration {

	@Bean
	fun kStream(streamsBuilder: StreamsBuilder): KStream<Int, String> {
		val stream = streamsBuilder.stream<Int, String>("ks1In")
		stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), JsonSerde()))
		return stream
	}

	private fun uppercaseValue(key: Int, value: String): KeyValue<Int?, String?> {
		return KeyValue(key, value.uppercase())
	}

}

默认情况下,对象管理的流会自动启动。 您可以使用 属性 自定义此行为。StreamBuilderspring.kafka.streams.auto-startupspring-doc.cn

其他 Kafka 属性

自动配置支持的属性显示在附录的 集成属性 部分中。 请注意,在大多数情况下,这些属性(带连字符或 camelCase)直接映射到 Apache Kafka 点分隔属性。 有关详细信息,请参阅 Apache Kafka 文档。spring-doc.cn

名称中不包含客户端类型 (、 、 或 ) 的属性被视为通用属性,并适用于所有客户端。 如果需要,可以覆盖一个或多个客户端类型的大多数公共属性。producerconsumeradminstreamsspring-doc.cn

Apache Kafka 指定重要性为 HIGH、MEDIUM 或 LOW 的属性。 Spring Boot 自动配置支持所有 HIGH 重要性属性,一些选定的 MEDIUM 和 LOW 属性,以及任何没有默认值的属性。spring-doc.cn

只有 Kafka 支持的属性子集可以直接通过该类使用。 如果要使用不直接支持的其他属性配置各个客户端类型,请使用以下属性:KafkaPropertiesspring-doc.cn

spring.kafka.properties[prop.one]=first
spring.kafka.admin.properties[prop.two]=second
spring.kafka.consumer.properties[prop.three]=third
spring.kafka.producer.properties[prop.four]=fourth
spring.kafka.streams.properties[prop.five]=fifth
spring:
  kafka:
    properties:
      "[prop.one]": "first"
    admin:
      properties:
        "[prop.two]": "second"
    consumer:
      properties:
        "[prop.three]": "third"
    producer:
      properties:
        "[prop.four]": "fourth"
    streams:
      properties:
        "[prop.five]": "fifth"

这会将公共 Kafka 属性设置为(适用于创建者、使用者、管理员和流),将 admin 属性设置为,将使用者属性设置为,将 producer 属性设置为,将 streams 属性设置为。prop.onefirstprop.twosecondprop.threethirdprop.fourfourthprop.fivefifthspring-doc.cn

您还可以按如下方式配置 Spring Kafka:JsonDeserializerspring-doc.cn

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties[spring.json.value.default.type]=com.example.Invoice
spring.kafka.consumer.properties[spring.json.trusted.packages]=com.example.main,com.example.another
spring:
  kafka:
    consumer:
      value-deserializer: "org.springframework.kafka.support.serializer.JsonDeserializer"
      properties:
        "[spring.json.value.default.type]": "com.example.Invoice"
        "[spring.json.trusted.packages]": "com.example.main,com.example.another"

同样,您可以禁用在 Headers 中发送类型信息的默认行为:JsonSerializerspring-doc.cn

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties[spring.json.add.type.headers]=false
spring:
  kafka:
    producer:
      value-serializer: "org.springframework.kafka.support.serializer.JsonSerializer"
      properties:
        "[spring.json.add.type.headers]": false
以这种方式设置的属性将覆盖 Spring Boot 显式支持的任何配置项。
以这种方式设置的属性将覆盖 Spring Boot 显式支持的任何配置项。

使用嵌入式 Kafka 进行测试

Spring for Apache Kafka 提供了一种使用嵌入式 Apache Kafka 代理测试项目的便捷方法。 要使用此功能,请使用 from the module 注释测试类。 有关更多信息,请参阅 Spring for Apache Kafka 参考手册@EmbeddedKafkaspring-kafka-testspring-doc.cn

要使 Spring Boot 自动配置与上述嵌入式 Apache Kafka 代理一起使用,您需要将嵌入式代理地址的系统属性(由 填充 )重新映射到 Apache Kafka 的 Spring Boot 配置属性中。 有几种方法可以做到这一点:EmbeddedKafkaBrokerspring-doc.cn

  • 在测试类中提供一个 system 属性,用于将嵌入式代理地址映射到其中:spring.kafka.bootstrap-serversspring-doc.cn

	static {
		System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
	}
	init {
		System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers")
	}
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.test.context.EmbeddedKafka;

@SpringBootTest
@EmbeddedKafka(topics = "someTopic", bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {

	// ...

}
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.kafka.test.context.EmbeddedKafka

@SpringBootTest
@EmbeddedKafka(topics = ["someTopic"], bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {

	// ...

}
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
spring:
  kafka:
    bootstrap-servers: "${spring.embedded.kafka.brokers}"