对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.0spring-doc.cn

监测

监视侦听器性能

从版本 2.3 开始,如果在 Classpath 上检测到 micrometer s,并且应用程序上下文中存在 single,则侦听器容器将自动为侦听器创建和更新 Micrometer s。 可以通过将 设置为 来禁用计时器。TimerMicrometerMeterRegistryContainerPropertymicrometerEnabledfalsespring-doc.cn

维护两个计时器 - 一个用于成功调用侦听器,另一个用于失败。spring-doc.cn

计时器已命名并具有以下标签:spring.kafka.listenerspring-doc.cn

您可以使用 的属性添加其他标记。ContainerPropertiesmicrometerTagsspring-doc.cn

从版本 2.9.8、3.0.6 开始,您可以在 的 ;该函数接收 and 返回标签,这些标签可以基于该记录,并与 中的任何静态标签合并。ContainerPropertiesmicrometerTagsProviderConsumerRecord<?, ?>micrometerTagsspring-doc.cn

使用并发容器,将为每个线程创建计时器,并且标记后缀为 where n is to 。name-n0concurrency-1

监控 KafkaTemplate 性能

从版本 2.5 开始,如果在 Classpath 上检测到 micrometer s for send 操作,并且应用程序上下文中存在 single ,则模板将自动创建和更新 Micrometer s s。 可以通过将模板的属性设置为 来禁用计时器。TimerMicrometerMeterRegistrymicrometerEnabledfalsespring-doc.cn

维护两个计时器 - 一个用于成功调用侦听器,另一个用于失败。spring-doc.cn

计时器已命名并具有以下标签:spring.kafka.templatespring-doc.cn

您可以使用模板的属性添加其他标记。micrometerTagsspring-doc.cn

从版本 2.9.8、3.0.6 开始,您可以提供属性;该函数接收 and 返回标签,这些标签可以基于该记录,并与 中的任何静态标签合并。KafkaTemplate.setMicrometerTagsProvider(Function<ProducerRecord<?, ?>, Map<String, String>>)ProducerRecord<?, ?>micrometerTagsspring-doc.cn

Micrometer 原生指标

从版本 2.5 开始,该框架提供了 Factory Listeners 来在创建和关闭 producer 和 consumer 时管理 Micrometer 实例。KafkaClientMetricsspring-doc.cn

要启用此功能,只需将侦听器添加到您的 producer 和 consumer 工厂:spring-doc.cn

@Bean
public ConsumerFactory<String, String> myConsumerFactory() {
    Map<String, Object> configs = consumerConfigs();
    ...
    DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(configs);
    ...
    cf.addListener(new MicrometerConsumerListener<String, String>(meterRegistry(),
            Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
    ...
    return cf;
}

@Bean
public ProducerFactory<String, String> myProducerFactory() {
    Map<String, Object> configs = producerConfigs();
    configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
    ...
    DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
    ...
    pf.addListener(new MicrometerProducerListener<String, String>(meterRegistry(),
            Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
    ...
    return pf;
}

传递给侦听器的使用者/生产者将添加到带有 tag name 的计量器标签中。idspring.idspring-doc.cn

获取其中一个 Kafka 指标的示例
double count = this.meterRegistry.get("kafka.producer.node.incoming.byte.total")
                .tag("customTag", "customTagValue")
                .tag("spring.id", "myProducerFactory.myClientId-1")
                .functionCounter()
                .count();

为 - 请参阅 KafkaStreams Micrometer Support 提供了类似的侦听器。StreamsBuilderFactoryBeanspring-doc.cn

千分尺观察

从版本 3.0 开始,现在支持对 和 listener 容器使用 Micrometer 进行观察。KafkaTemplatespring-doc.cn

设置为 on 和 以启用观察;这将禁用 Micrometer Timers,因为计时器现在将随每个观测一起管理。observationEnabledtrueKafkaTemplateContainerPropertiesspring-doc.cn

千分尺观测不支持批量侦听器;这将启用 Micrometer Timers

有关更多信息,请参阅 Micrometer Tracingspring-doc.cn

要向计时器/跟踪添加标签,请分别配置自定义或模板或侦听器容器。KafkaTemplateObservationConventionKafkaListenerObservationConventionspring-doc.cn

默认实施会添加 template observations 的标签和 containers 的标签。bean.namelistener.idspring-doc.cn

您可以子类化 or or 提供全新的实现。DefaultKafkaTemplateObservationConventionDefaultKafkaListenerObservationConventionspring-doc.cn

请参阅千分尺观测文档,了解记录的默认观测值的详细信息。spring-doc.cn

从版本 3.0.6 开始,您可以根据使用者或创建者记录中的信息向计时器和跟踪添加动态标签。 为此,请分别向侦听器容器属性添加自定义和/或。 两个观察上下文中的属性分别包含 or。KafkaListenerObservationConventionKafkaTemplateObservationConventionKafkaTemplaterecordConsumerRecordProducerRecordspring-doc.cn

发送方和接收方上下文属性设置为 Kafka 属性;这由 . 如果由于某种原因 - 可能缺少管理员权限,您无法检索集群 ID,从版本 3.1 开始,您可以在 上设置手册并将其注入 s 和侦听器容器中。 如果为 (default),则 admin 将调用 admin 操作以从 broker 中检索它。remoteServiceNameclusterIdKafkaAdminclusterIdKafkaAdminKafkaTemplatenulldescribeClusterspring-doc.cn