Apache Kafka 通过提供项目的自动配置来支持。spring-kafkaSpring中文文档

Kafka 配置由 中的外部配置属性控制。 例如,您可以在以下位置声明以下部分:spring.kafka.*application.propertiesSpring中文文档

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
spring:
  kafka:
    bootstrap-servers: "localhost:9092"
    consumer:
      group-id: "myGroup"
要在启动时创建主题,请添加类型为 的 Bean 。 如果该主题已存在,则忽略该 bean。NewTopic

有关更多支持的选项,请参阅 KafkaPropertiesSpring中文文档

要在启动时创建主题,请添加类型为 的 Bean 。 如果该主题已存在,则忽略该 bean。NewTopic

发送消息

Spring 是自动配置的,您可以直接在自己的 bean 中自动连接它,如以下示例所示:KafkaTemplateSpring中文文档

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final KafkaTemplate<String, String> kafkaTemplate;

	public MyBean(KafkaTemplate<String, String> kafkaTemplate) {
		this.kafkaTemplate = kafkaTemplate;
	}

	// ...

	public void someMethod() {
		this.kafkaTemplate.send("someTopic", "Hello");
	}

}
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val kafkaTemplate: KafkaTemplate<String, String>) {

	// ...

	fun someMethod() {
		kafkaTemplate.send("someTopic", "Hello")
	}

}
如果定义了该属性,则会自动配置 a。 此外,如果定义了 Bean,则会自动将其关联到自动配置的 .spring.kafka.producer.transaction-id-prefixKafkaTransactionManagerRecordMessageConverterKafkaTemplate
如果定义了该属性,则会自动配置 a。 此外,如果定义了 Bean,则会自动将其关联到自动配置的 .spring.kafka.producer.transaction-id-prefixKafkaTransactionManagerRecordMessageConverterKafkaTemplate

接收消息

当 Apache Kafka 基础结构存在时,可以对任何 Bean 进行注释以创建侦听器端点。 如果未定义,则会自动使用中定义的键配置默认键。@KafkaListenerKafkaListenerContainerFactoryspring.kafka.listener.*Spring中文文档

以下组件在主题上创建侦听器终结点:someTopicSpring中文文档

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@KafkaListener(topics = "someTopic")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.stereotype.Component

@Component
class MyBean {

	@KafkaListener(topics = ["someTopic"])
	fun processMessage(content: String?) {
		// ...
	}

}

如果定义了 Bean,那么它会自动关联到容器工厂。 同样,如果定义了 、 或 bean,则会自动将其关联到缺省工厂。KafkaTransactionManagerRecordFilterStrategyCommonErrorHandlerAfterRollbackProcessorConsumerAwareRebalanceListenerSpring中文文档

根据侦听器类型,a 或 bean 与默认工厂相关联。 如果批处理侦听器只存在一个 bean,则该 bean 将包装在 .RecordMessageConverterBatchMessageConverterRecordMessageConverterBatchMessageConverterSpring中文文档

必须标记自定义,因为它通常引用自动配置的 Bean。ChainedKafkaTransactionManager@PrimaryKafkaTransactionManager
必须标记自定义,因为它通常引用自动配置的 Bean。ChainedKafkaTransactionManager@PrimaryKafkaTransactionManager

Kafka 流

Spring for Apache Kafka 提供了一个工厂 Bean 来创建对象并管理其流的生命周期。 Spring Boot 会自动配置所需的 bean,只要在类路径上,并且 Kafka Streams 由注解启用。StreamsBuilderKafkaStreamsConfigurationkafka-streams@EnableKafkaStreamsSpring中文文档

启用 Kafka Streams 意味着必须设置应用程序 ID 和引导服务器。 前者可以使用 配置,如果未设置,则默认为 。 后者可以全局设置,也可以仅针对流专门覆盖。spring.kafka.streams.application-idspring.application.nameSpring中文文档

使用专用属性可以使用其他几个属性;可以使用命名空间设置其他任意 Kafka 属性。 有关更多信息,另请参阅其他 Kafka 属性spring.kafka.streams.propertiesSpring中文文档

要使用工厂 bean,请连接到 u,如以下示例所示:StreamsBuilder@BeanSpring中文文档

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.support.serializer.JsonSerde;

@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
public class MyKafkaStreamsConfiguration {

	@Bean
	public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
		KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
		stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), new JsonSerde<>()));
		return stream;
	}

	private KeyValue<Integer, String> uppercaseValue(Integer key, String value) {
		return new KeyValue<>(key, value.toUpperCase());
	}

}
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Produced
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.annotation.EnableKafkaStreams
import org.springframework.kafka.support.serializer.JsonSerde

@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
class MyKafkaStreamsConfiguration {

	@Bean
	fun kStream(streamsBuilder: StreamsBuilder): KStream<Int, String> {
		val stream = streamsBuilder.stream<Int, String>("ks1In")
		stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), JsonSerde()))
		return stream
	}

	private fun uppercaseValue(key: Int, value: String): KeyValue<Int?, String?> {
		return KeyValue(key, value.uppercase())
	}

}

默认情况下,对象管理的流会自动启动。 可以使用该属性自定义此行为。StreamBuilderspring.kafka.streams.auto-startupSpring中文文档

其他 Kafka 属性

自动配置支持的属性显示在附录的“集成属性”部分。 请注意,在大多数情况下,这些属性(连字符或 camelCase)直接映射到 Apache Kafka 点缀属性。 有关详细信息,请参阅 Apache Kafka 文档。Spring中文文档

名称中不包含客户端类型(、、 或 )的属性被视为通用属性,并适用于所有客户端。 如果需要,可以为一个或多个客户端类型重写这些公共属性中的大多数。producerconsumeradminstreamsSpring中文文档

Apache Kafka 指定重要性为 HIGH、MEDIUM 或 LOW 的属性。 Spring Boot 自动配置支持所有 HIGH 重要性属性、一些选定的 MEDIUM 和 LOW 属性,以及任何没有默认值的属性。Spring中文文档

只有 Kafka 支持的属性子集可直接通过该类获得。 如果要使用不直接支持的其他属性配置单个客户端类型,请使用以下属性:KafkaPropertiesSpring中文文档

spring.kafka.properties[prop.one]=first
spring.kafka.admin.properties[prop.two]=second
spring.kafka.consumer.properties[prop.three]=third
spring.kafka.producer.properties[prop.four]=fourth
spring.kafka.streams.properties[prop.five]=fifth
spring:
  kafka:
    properties:
      "[prop.one]": "first"
    admin:
      properties:
        "[prop.two]": "second"
    consumer:
      properties:
        "[prop.three]": "third"
    producer:
      properties:
        "[prop.four]": "fourth"
    streams:
      properties:
        "[prop.five]": "fifth"

这会将公共 Kafka 属性设置为(适用于生产者、使用者、管理员和流),将 admin 属性设置为 ,将 consumer 属性设置为 ,将 producer 属性设置为 ,将 streams 属性设置为 。prop.onefirstprop.twosecondprop.threethirdprop.fourfourthprop.fivefifthSpring中文文档

您还可以按如下方式配置 Spring Kafka:JsonDeserializerSpring中文文档

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties[spring.json.value.default.type]=com.example.Invoice
spring.kafka.consumer.properties[spring.json.trusted.packages]=com.example.main,com.example.another
spring:
  kafka:
    consumer:
      value-deserializer: "org.springframework.kafka.support.serializer.JsonDeserializer"
      properties:
        "[spring.json.value.default.type]": "com.example.Invoice"
        "[spring.json.trusted.packages]": "com.example.main,com.example.another"

同样,您可以禁用在标头中发送类型信息的默认行为:JsonSerializerSpring中文文档

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties[spring.json.add.type.headers]=false
spring:
  kafka:
    producer:
      value-serializer: "org.springframework.kafka.support.serializer.JsonSerializer"
      properties:
        "[spring.json.add.type.headers]": false
以这种方式设置的属性将覆盖 Spring Boot 显式支持的任何配置项。
以这种方式设置的属性将覆盖 Spring Boot 显式支持的任何配置项。

使用嵌入式 Kafka 进行测试

Spring for Apache Kafka 提供了一种使用嵌入式 Apache Kafka 代理测试项目的便捷方法。 要使用此功能,请使用模块中的测试类进行批注。 有关更多信息,请参阅 Spring for Apache Kafka 参考手册@EmbeddedKafkaspring-kafka-testSpring中文文档

要使 Spring Boot 自动配置与上述嵌入式 Apache Kafka 代理配合使用,您需要将嵌入式代理地址的系统属性(由 填充)重新映射到 Apache Kafka 的 Spring Boot 配置属性中。 有几种方法可以做到这一点:EmbeddedKafkaBrokerSpring中文文档

  • 提供一个系统属性,用于在测试类中将嵌入式代理地址映射到:spring.kafka.bootstrap-serversSpring中文文档

	static {
		System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
	}
	init {
		System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers")
	}
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.test.context.EmbeddedKafka;

@SpringBootTest
@EmbeddedKafka(topics = "someTopic", bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {

	// ...

}
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.kafka.test.context.EmbeddedKafka

@SpringBootTest
@EmbeddedKafka(topics = ["someTopic"], bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {

	// ...

}
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
spring:
  kafka:
    bootstrap-servers: "${spring.embedded.kafka.brokers}"