连接到 Kafka
从版本 2.5 开始,它们中的每一个都扩展了 .
这允许在运行时通过向其配置中添加 a 来更改引导服务器: .
将对所有新连接调用此函数以获取服务器列表。
Consumer 和 Producer 通常存在很长时间。
要关闭现有 Producer,请调用 .
要关闭现有的 Consumers,请在 and/or 和任何其他侦听器容器 bean 上调用 (然后 )。KafkaResourceFactory
Supplier<String>
setBootstrapServersSupplier(() -> …)
reset()
DefaultKafkaProducerFactory
stop()
start()
KafkaListenerEndpointRegistry
stop()
start()
为方便起见,该框架还提供了一个支持两组引导服务器的 Bootstrap Server;其中 1 个随时处于活动状态。
配置 并将其添加到生产者和使用者工厂,以及 ,通过调用 .
当你想切换时,打电话 or 和 call 生产者工厂建立新的连接;对于使用者和所有侦听器容器。
当使用 s 和 bean.ABSwitchCluster
ABSwitchCluster
KafkaAdmin
setBootstrapServersSupplier()
primary()
secondary()
reset()
stop()
start()
@KafkaListener
stop()
start()
KafkaListenerEndpointRegistry
有关更多信息,请参阅 Javadocs。
Factory 侦听器
从版本 2.5 开始,可以将 和 配置为在创建或关闭 producer 或 consumer 时接收通知。DefaultKafkaProducerFactory
DefaultKafkaConsumerFactory
Listener
interface Listener<K, V> {
default void producerAdded(String id, Producer<K, V> producer) {
}
default void producerRemoved(String id, Producer<K, V> producer) {
}
}
interface Listener<K, V> {
default void consumerAdded(String id, Consumer<K, V> consumer) {
}
default void consumerRemoved(String id, Consumer<K, V> consumer) {
}
}
在每种情况下,都是通过将属性(从创建后获得)附加到 factory 属性来创建的,以 .id
client-id
metrics()
beanName
.
例如,这些侦听器可用于在创建新 Client 端时创建和绑定 Micrometer 实例(并在 Client 端关闭时关闭它)。KafkaClientMetrics
该框架提供的侦听器正是这样做的;请参阅 Micrometer 本机度量。
默认客户端 ID 前缀
从版本 3.2 开始,对于使用属性定义应用程序名称的 Spring Boot 应用程序,现在使用此名称
作为这些客户端类型的自动生成的客户端 ID 的默认前缀:spring.application.name
-
不使用 Consumer Group 的 Consumer 客户端
-
生产者客户端
-
管理员客户端
这样可以更轻松地在服务器端识别这些客户端,以便进行故障排除或应用配额。
客户端类型 | 无应用程序名称 | 使用应用程序名称 |
---|---|---|
没有 Consumer Group 的 Consumer |
使用者 null-1 |
myapp-consumer-1 |
具有使用者组 “mygroup” 的使用者 |
消费者 mygroup-1 |
消费者 mygroup-1 |
制作人 |
生产者-1 |
myapp-producer-1 |
管理 |
adminclient-1 |
myapp-admin-1 |