对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.0spring-doc.cn

连接到 Kafka

从版本 2.5 开始,它们中的每一个都扩展了 . 这允许在运行时通过向其配置中添加 a 来更改引导服务器: . 将对所有新连接调用此函数以获取服务器列表。 Consumer 和 Producer 通常存在很长时间。 要关闭现有 Producer,请调用 . 要关闭现有的 Consumers,请在 and/or 和任何其他侦听器容器 bean 上调用 (然后 )。KafkaResourceFactorySupplier<String>setBootstrapServersSupplier(() -> …​)reset()DefaultKafkaProducerFactorystop()start()KafkaListenerEndpointRegistrystop()start()spring-doc.cn

为方便起见,该框架还提供了一个支持两组引导服务器的 Bootstrap Server;其中 1 个随时处于活动状态。 配置 并将其添加到生产者和使用者工厂,以及 ,通过调用 . 当你想切换时,打电话 or 和 call 生产者工厂建立新的连接;对于使用者和所有侦听器容器。 当使用 s 和 bean.ABSwitchClusterABSwitchClusterKafkaAdminsetBootstrapServersSupplier()primary()secondary()reset()stop()start()@KafkaListenerstop()start()KafkaListenerEndpointRegistryspring-doc.cn

有关更多信息,请参阅 Javadocs。spring-doc.cn

Factory 侦听器

从版本 2.5 开始,可以将 和 配置为在创建或关闭 producer 或 consumer 时接收通知。DefaultKafkaProducerFactoryDefaultKafkaConsumerFactoryListenerspring-doc.cn

Producer Factory 侦听器
interface Listener<K, V> {

    default void producerAdded(String id, Producer<K, V> producer) {
    }

    default void producerRemoved(String id, Producer<K, V> producer) {
    }

}
Consumer Factory 侦听器
interface Listener<K, V> {

    default void consumerAdded(String id, Consumer<K, V> consumer) {
    }

    default void consumerRemoved(String id, Consumer<K, V> consumer) {
    }

}

在每种情况下,都是通过将属性(从创建后获得)附加到 factory 属性来创建的,以 .idclient-idmetrics()beanName.spring-doc.cn

例如,这些侦听器可用于在创建新 Client 端时创建和绑定 Micrometer 实例(并在 Client 端关闭时关闭它)。KafkaClientMetricsspring-doc.cn

该框架提供的侦听器正是这样做的;请参阅 Micrometer 本机度量。spring-doc.cn