When using a concurrent message listener container, a single listener instance is invoked on all consumer threads. Listeners, therefore, need to be thread-safe, and it is preferable to use stateless listeners. If it is not possible to make your listener thread-safe or adding synchronization would significantly reduce the benefit of adding concurrency, you can use one of a few techniques:
-
Use
n
containers withconcurrency=1
with a prototype scopedMessageListener
bean so that each container gets its own instance (this is not possible when using@KafkaListener
). -
Keep the state in
ThreadLocal<?>
instances. -
Have the singleton listener delegate to a bean that is declared in
SimpleThreadScope
(or a similar scope).
To facilitate cleaning up thread state (for the second and third items in the preceding list), starting with version 2.2, the listener container publishes a ConsumerStoppedEvent
when each thread exits.
You can consume these events with an ApplicationListener
or @EventListener
method to remove ThreadLocal<?>
instances or remove()
thread-scoped beans from the scope.
Note that SimpleThreadScope
does not destroy beans that have a destruction interface (such as DisposableBean
), so you should destroy()
the instance yourself.
By default, the application context’s event multicaster invokes event listeners on the calling thread. If you change the multicaster to use an async executor, thread cleanup is not effective. |
By default, the application context’s event multicaster invokes event listeners on the calling thread. If you change the multicaster to use an async executor, thread cleanup is not effective. |
Special Note on Virtual Threads and Concurrent Message Listener Containers
Because of certain limitations in the underlying library classes still using synchronized
blocks for thread coordination, applications need to be cautious when using virtual threads with concurrent message listener containers.
When virtual threads are enabled, if the concurrency exceeds the available number of platform threads, it is very likely for the virtual threads to be pinned on the platform threads and possible race conditions.
Therefore, as the 3rd party libraries that Spring for Apache Kafka uses evolves to fully support virtual threads, it is recommended to keep the concurrency on the message listener container to be equal to or less than the number of platform threads.
This way, the applications avoid any race conditions between the threads and the virtual threads being pinned on platform threads.