对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.0! |
监测
监视侦听器性能
从版本 2.3 开始,侦听器容器将自动创建和更新 MicrometerTimer
s 表示侦听器,如果Micrometer
,并且单个MeterRegistry
存在于应用程序上下文中。
可以通过设置ContainerProperty
的micrometerEnabled
自false
.
维护两个计时器 - 一个用于成功调用侦听器,另一个用于失败。
计时器被命名为spring.kafka.listener
并具有以下标签:
-
name
:(容器 Bean 名称) -
result
:success
或failure
-
exception
:none
或ListenerExecutionFailedException
您可以使用ContainerProperties
的micrometerTags
财产。
从版本 2.9.8、3.0.6 开始,您可以在ContainerProperties
的micrometerTagsProvider
;该函数接收ConsumerRecord<?, ?>
并返回可以基于该记录的标签,并与micrometerTags
.
使用并发容器,将为每个线程创建计时器,并且name 标签的后缀为-n 其中 n 是0 自concurrency-1 . |
监控 KafkaTemplate 性能
从版本 2.5 开始,模板将自动创建并更新 MicrometerTimer
+++s 用于发送作,如果Micrometer
,并且单个MeterRegistry
存在于应用程序上下文中。
可以通过设置模板的micrometerEnabled
property 设置为false
.
维护两个计时器 - 一个用于成功调用侦听器,另一个用于失败。
计时器被命名为spring.kafka.template
并具有以下标签:
-
name
:(模板 Bean 名称) -
result
:success
或failure
-
exception
:none
或失败的异常类名称
您可以使用模板的micrometerTags
财产。
从版本 2.9.8、3.0.6 开始,您可以提供KafkaTemplate.setMicrometerTagsProvider(Function<ProducerRecord<?, ?>, Map<String, String>>)
财产;该函数接收ProducerRecord<?, ?>
并返回可以基于该记录的标签,并与micrometerTags
.
Micrometer 原生指标
从版本 2.5 开始,框架提供了 Factory Listeners 来管理 MicrometerKafkaClientMetrics
实例。
要启用此功能,只需将侦听器添加到您的 producer 和 consumer 工厂:
@Bean
public ConsumerFactory<String, String> myConsumerFactory() {
Map<String, Object> configs = consumerConfigs();
...
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(configs);
...
cf.addListener(new MicrometerConsumerListener<String, String>(meterRegistry(),
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
...
return cf;
}
@Bean
public ProducerFactory<String, String> myProducerFactory() {
Map<String, Object> configs = producerConfigs();
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
...
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
...
pf.addListener(new MicrometerProducerListener<String, String>(meterRegistry(),
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
...
return pf;
}
消费者/生产者id
传递给侦听器的 Tag 将添加到带有 Tag Name 的计量器标签中spring.id
.
double count = this.meterRegistry.get("kafka.producer.node.incoming.byte.total")
.tag("customTag", "customTagValue")
.tag("spring.id", "myProducerFactory.myClientId-1")
.functionCounter()
.count();
为StreamsBuilderFactoryBean
- 请参阅 KafkaStreams Micrometer 支持。
千分尺观察
从 3.0 版本开始,现在支持使用 Micrometer 进行观察,因为KafkaTemplate
和侦听器容器。
设置observationEnabled
自true
在KafkaTemplate
和ContainerProperties
使观察成为可能;这将禁用 Micrometer Timers,因为计时器现在将随每个观测一起管理。
千分尺观测不支持批量侦听器;这将启用 Micrometer Timers |
有关更多信息,请参阅 Micrometer Tracing 。
要向计时器/跟踪添加标签,请配置自定义KafkaTemplateObservationConvention
或KafkaListenerObservationConvention
分别添加到模板或侦听器容器中。
默认实现会添加bean.name
标签,以及listener.id
标记。
你可以子类化DefaultKafkaTemplateObservationConvention
或DefaultKafkaListenerObservationConvention
或提供全新的实现。
请参阅千分尺观测文档,了解记录的默认观测值的详细信息。
从版本 3.0.6 开始,您可以根据使用者或创建者记录中的信息向计时器和跟踪添加动态标签。
为此,请添加自定义KafkaListenerObservationConvention
和/或KafkaTemplateObservationConvention
添加到侦听器容器属性中,或者KafkaTemplate
分别。
这record
属性都包含ConsumerRecord
或ProducerRecord
分别。
发送方和接收方上下文的remoteServiceName
属性设置为 KafkaclusterId
财产;这由KafkaAdmin
.
如果由于某种原因 - 可能缺少管理员权限,您无法检索集群 ID,则从版本 3.1 开始,您可以设置手动clusterId
在KafkaAdmin
并将其注入KafkaTemplate
s 和侦听器容器。
当它是null
(默认),管理员将调用describeCluster
admin作从 broker 中检索它。