This version is still in development and is not considered stable yet. For the latest stable version, please use Spring for Apache Kafka 3.3.0!spring-doc.cn

Monitoring

Monitoring Listener Performance

Starting with version 2.3, the listener container will automatically create and update Micrometer Timers for the listener, if Micrometer is detected on the classpath, and a single MeterRegistry is present in the application context. The timers can be disabled by setting the ContainerProperty's micrometerEnabled to false.spring-doc.cn

Two timers are maintained - one for successful calls to the listener and one for failures.spring-doc.cn

The timers are named spring.kafka.listener and have the following tags:spring-doc.cn

You can add additional tags using the ContainerProperties's micrometerTags property.spring-doc.cn

Starting with versions 2.9.8, 3.0.6, you can provide a function in ContainerProperties's micrometerTagsProvider; the function receives the ConsumerRecord<?, ?> and returns tags which can be based on that record, and merged with any static tags in micrometerTags.spring-doc.cn

With the concurrent container, timers are created for each thread and the name tag is suffixed with -n where n is 0 to concurrency-1.

Monitoring KafkaTemplate Performance

Starting with version 2.5, the template will automatically create and update Micrometer Timers for send operations, if Micrometer is detected on the classpath, and a single MeterRegistry is present in the application context. The timers can be disabled by setting the template’s micrometerEnabled property to false.spring-doc.cn

Two timers are maintained - one for successful calls to the listener and one for failures.spring-doc.cn

The timers are named spring.kafka.template and have the following tags:spring-doc.cn

You can add additional tags using the template’s micrometerTags property.spring-doc.cn

Starting with versions 2.9.8, 3.0.6, you can provide a KafkaTemplate.setMicrometerTagsProvider(Function<ProducerRecord<?, ?>, Map<String, String>>) property; the function receives the ProducerRecord<?, ?> and returns tags which can be based on that record, and merged with any static tags in micrometerTags.spring-doc.cn

Micrometer Native Metrics

Starting with version 2.5, the framework provides Factory Listeners to manage a Micrometer KafkaClientMetrics instance whenever producers and consumers are created and closed.spring-doc.cn

To enable this feature, simply add the listeners to your producer and consumer factories:spring-doc.cn

@Bean
public ConsumerFactory<String, String> myConsumerFactory() {
    Map<String, Object> configs = consumerConfigs();
    ...
    DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(configs);
    ...
    cf.addListener(new MicrometerConsumerListener<String, String>(meterRegistry(),
            Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
    ...
    return cf;
}

@Bean
public ProducerFactory<String, String> myProducerFactory() {
    Map<String, Object> configs = producerConfigs();
    configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
    ...
    DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
    ...
    pf.addListener(new MicrometerProducerListener<String, String>(meterRegistry(),
            Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
    ...
    return pf;
}

The consumer/producer id passed to the listener is added to the meter’s tags with tag name spring.id.spring-doc.cn

An example of obtaining one of the Kafka metrics
double count = this.meterRegistry.get("kafka.producer.node.incoming.byte.total")
                .tag("customTag", "customTagValue")
                .tag("spring.id", "myProducerFactory.myClientId-1")
                .functionCounter()
                .count();

A similar listener is provided for the StreamsBuilderFactoryBean - see KafkaStreams Micrometer Support.spring-doc.cn

Starting with version 3.3, a KafkaMetricsSupport abstract class is introduced to manage io.micrometer.core.instrument.binder.kafka.KafkaMetrics binding into a MeterRegistry for provided Kafka client. This class is a super for the mentioned above MicrometerConsumerListener, MicrometerProducerListener and KafkaStreamsMicrometerListener. However, it can be used for any Kafka client use-cases. The class needs to be extended and its bindClient() and unbindClient() API have to be called to connect Kafka client metrics with a Micrometer collector.spring-doc.cn

Micrometer Observation

Using Micrometer for observation is now supported, since version 3.0, for the KafkaTemplate and listener containers.spring-doc.cn

Set observationEnabled to true on the KafkaTemplate and ContainerProperties to enable observation; this will disable Micrometer Timers because the timers will now be managed with each observation.spring-doc.cn

Micrometer Observation does not support batch listener; this will enable Micrometer Timers

Refer to Micrometer Tracing for more information.spring-doc.cn

To add tags to timers/traces, configure a custom KafkaTemplateObservationConvention or KafkaListenerObservationConvention to the template or listener container, respectively.spring-doc.cn

The default implementations add the bean.name tag for template observations and listener.id tag for containers.spring-doc.cn

You can either subclass DefaultKafkaTemplateObservationConvention or DefaultKafkaListenerObservationConvention or provide completely new implementations.spring-doc.cn

See Micrometer Observation Documentation for details of the default observations that are recorded.spring-doc.cn

Starting with version 3.0.6, you can add dynamic tags to the timers and traces, based on information in the consumer or producer records. To do so, add a custom KafkaListenerObservationConvention and/or KafkaTemplateObservationConvention to the listener container properties or KafkaTemplate respectively. The record property in both observation contexts contains the ConsumerRecord or ProducerRecord respectively.spring-doc.cn

The sender and receiver contexts remoteServiceName properties are set to the Kafka clusterId property; this is retrieved by a KafkaAdmin. If, for some reason - perhaps lack of admin permissions, you cannot retrieve the cluster id, starting with version 3.1, you can set a manual clusterId on the KafkaAdmin and inject it into KafkaTemplate s and listener containers. When it is null (default), the admin will invoke the describeCluster admin operation to retrieve it from the broker.spring-doc.cn