Apache Kafka 支持
通过提供项目的自动配置来支持 Apache Kafka。spring-kafka
Kafka 配置由 中的外部配置属性控制。
例如,您可以在 :spring.kafka.*
application.properties
-
Properties
-
YAML
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
spring:
kafka:
bootstrap-servers: "localhost:9092"
consumer:
group-id: "myGroup"
要在启动时创建主题,请添加 NewTopic 类型的 Bean。
如果主题已存在,则忽略该 Bean。 |
有关更多支持的选项,请参阅 KafkaProperties
。
发送消息
Spring 的KafkaTemplate
是自动配置的,你可以直接在自己的 bean 中自动连接它,如以下示例所示:
-
Java
-
Kotlin
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
private final KafkaTemplate<String, String> kafkaTemplate;
public MyBean(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
// ...
public void someMethod() {
this.kafkaTemplate.send("someTopic", "Hello");
}
}
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.stereotype.Component
@Component
class MyBean(private val kafkaTemplate: KafkaTemplate<String, String>) {
// ...
fun someMethod() {
kafkaTemplate.send("someTopic", "Hello")
}
}
如果定义了该属性,则会自动配置 KafkaTransactionManager 。
此外,如果定义了RecordMessageConverter bean,则它将自动关联到自动配置的KafkaTemplate 。spring.kafka.producer.transaction-id-prefix |
接收消息
当存在 Apache Kafka 基础设施时,任何 bean 都可以使用 @KafkaListener
进行注释以创建侦听器终端节点。
如果未定义 KafkaListenerContainerFactory,则会自动使用 中定义的键配置默认 KafkaListenerContainerFactory
。spring.kafka.listener.*
以下组件在主题上创建侦听器终端节点:someTopic
-
Java
-
Kotlin
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@KafkaListener(topics = "someTopic")
public void processMessage(String content) {
// ...
}
}
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.stereotype.Component
@Component
class MyBean {
@KafkaListener(topics = ["someTopic"])
fun processMessage(content: String?) {
// ...
}
}
如果定义了KafkaTransactionManager
Bean,则它将自动与容器工厂关联。
同样,如果定义了RecordFilterStrategy
、CommonErrorHandler
、AfterRollbackProcessor
或ConsumerAwareRebalanceListener
Bean,则它会自动与默认工厂相关联。
根据侦听器类型,RecordMessageConverter
或BatchMessageConverter
bean 与默认工厂相关联。
如果批处理侦听器仅存在RecordMessageConverter
Bean,则将其包装在BatchMessageConverter
中。
自定义ChainedKafkaTransactionManager 必须标记为@Primary 因为它通常引用自动配置的KafkaTransactionManager Bean。 |
Kafka 流
Spring for Apache Kafka 提供了一个工厂 Bean 来创建 StreamsBuilder
对象并管理其流的生命周期。
Spring Boot 会自动配置所需的KafkaStreamsConfiguration
Bean,只要它在 Classpath 上,并且 Kafka Streams 是由 @EnableKafkaStreams
注释启用的。kafka-streams
启用 Kafka Streams 意味着必须设置应用程序 ID 和引导服务器。
前者可以使用 ,如果未设置,则默认为 。
后者可以全局设置,也可以仅针对流专门覆盖。spring.kafka.streams.application-id
spring.application.name
使用专用属性可以使用多个其他属性;可以使用命名空间设置其他任意 Kafka 属性。
有关更多信息,另请参阅其他 Kafka 属性。spring.kafka.streams.properties
要使用工厂 Bean,请将 StreamsBuilder
连接到您的 @Bean
,如以下示例所示:
-
Java
-
Kotlin
import java.util.Locale;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.support.serializer.JsonSerde;
@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
public class MyKafkaStreamsConfiguration {
@Bean
public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), new JsonSerde<>()));
return stream;
}
private KeyValue<Integer, String> uppercaseValue(Integer key, String value) {
return new KeyValue<>(key, value.toUpperCase(Locale.getDefault()));
}
}
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Produced
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.annotation.EnableKafkaStreams
import org.springframework.kafka.support.serializer.JsonSerde
@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
class MyKafkaStreamsConfiguration {
@Bean
fun kStream(streamsBuilder: StreamsBuilder): KStream<Int, String> {
val stream = streamsBuilder.stream<Int, String>("ks1In")
stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), JsonSerde()))
return stream
}
private fun uppercaseValue(key: Int, value: String): KeyValue<Int?, String?> {
return KeyValue(key, value.uppercase())
}
}
默认情况下,由 StreamsBuilder
对象管理的流会自动启动。
您可以使用 属性 自定义此行为。spring.kafka.streams.auto-startup
其他 Kafka 属性
自动配置支持的属性显示在附录的 集成属性 部分中。 请注意,在大多数情况下,这些属性(带连字符或 camelCase)直接映射到 Apache Kafka 点分隔属性。 有关详细信息,请参阅 Apache Kafka 文档。
名称中不包含客户端类型 (、 、 或 ) 的属性被视为通用属性,并适用于所有客户端。
如果需要,可以覆盖一个或多个客户端类型的大多数公共属性。producer
consumer
admin
streams
Apache Kafka 指定重要性为 HIGH、MEDIUM 或 LOW 的属性。 Spring Boot 自动配置支持所有 HIGH 重要性属性,一些选定的 MEDIUM 和 LOW 属性,以及任何没有默认值的属性。
只有 Kafka 支持的属性子集可以直接通过 KafkaProperties
类获得。
如果要使用不直接支持的其他属性配置各个客户端类型,请使用以下属性:
-
Properties
-
YAML
spring.kafka.properties[prop.one]=first
spring.kafka.admin.properties[prop.two]=second
spring.kafka.consumer.properties[prop.three]=third
spring.kafka.producer.properties[prop.four]=fourth
spring.kafka.streams.properties[prop.five]=fifth
spring:
kafka:
properties:
"[prop.one]": "first"
admin:
properties:
"[prop.two]": "second"
consumer:
properties:
"[prop.three]": "third"
producer:
properties:
"[prop.four]": "fourth"
streams:
properties:
"[prop.five]": "fifth"
这会将公共 Kafka 属性设置为(适用于创建者、使用者、管理员和流),将 admin 属性设置为,将使用者属性设置为,将 producer 属性设置为,将 streams 属性设置为。prop.one
first
prop.two
second
prop.three
third
prop.four
fourth
prop.five
fifth
你还可以按如下方式配置 Spring Kafka JsonDeserializer
:
-
Properties
-
YAML
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties[spring.json.value.default.type]=com.example.Invoice
spring.kafka.consumer.properties[spring.json.trusted.packages]=com.example.main,com.example.another
spring:
kafka:
consumer:
value-deserializer: "org.springframework.kafka.support.serializer.JsonDeserializer"
properties:
"[spring.json.value.default.type]": "com.example.Invoice"
"[spring.json.trusted.packages]": "com.example.main,com.example.another"
同样,你可以禁用 JsonSerializer 在 Headers
中发送类型信息的默认行为:
-
Properties
-
YAML
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties[spring.json.add.type.headers]=false
spring:
kafka:
producer:
value-serializer: "org.springframework.kafka.support.serializer.JsonSerializer"
properties:
"[spring.json.add.type.headers]": false
以这种方式设置的属性将覆盖 Spring Boot 显式支持的任何配置项。 |
使用嵌入式 Kafka 进行测试
Spring for Apache Kafka 提供了一种使用嵌入式 Apache Kafka 代理测试项目的便捷方法。
要使用此功能,请使用模块中的@EmbeddedKafka
对测试类进行注释。
有关更多信息,请参阅 Spring for Apache Kafka 参考手册。spring-kafka-test
要使 Spring Boot 自动配置与上述嵌入式 Apache Kafka 代理一起工作,你需要将嵌入式代理地址的系统属性(由EmbeddedKafkaBroker
填充)重新映射到 Apache Kafka 的 Spring Boot 配置属性中。
有几种方法可以做到这一点:
-
在测试类中提供一个 system 属性,用于将嵌入式代理地址映射到其中:
spring.kafka.bootstrap-servers
-
Java
-
Kotlin
static {
System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
}
init {
System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers")
}
-
在
@EmbeddedKafka
注解上配置属性名称:
-
Java
-
Kotlin
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.test.context.EmbeddedKafka;
@SpringBootTest
@EmbeddedKafka(topics = "someTopic", bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {
// ...
}
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.kafka.test.context.EmbeddedKafka
@SpringBootTest
@EmbeddedKafka(topics = ["someTopic"], bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {
// ...
}
-
在配置属性中使用占位符:
-
Properties
-
YAML
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
spring:
kafka:
bootstrap-servers: "${spring.embedded.kafka.brokers}"