This version is still in development and is not considered stable yet. For the latest stable version, please use Spring for Apache Kafka 3.2.4!

This version is still in development and is not considered stable yet. For the latest stable version, please use Spring for Apache Kafka 3.2.4!

The feature is designed to be used with @KafkaListener; however, several users have requested information on how to configure non-blocking retries programmatically. The following Spring Boot application provides an example of how to do

public class Application extends RetryTopicConfigurationSupport {

    public static void main(String[] args) {, args);

    RetryTopicConfiguration retryConfig(KafkaTemplate<String, String> template) {
        return RetryTopicConfigurationBuilder.newInstance()
                .autoCreateTopicsWith(2, (short) 1)

    TaskScheduler scheduler() {
        return new ThreadPoolTaskScheduler();

    SmartInitializingSingleton dynamicRetry(RetryTopicConfigurer configurer, RetryTopicConfiguration config,
            KafkaListenerAnnotationBeanPostProcessor<?, ?> bpp, KafkaListenerContainerFactory<?> factory,
            Listener listener, KafkaListenerEndpointRegistry registry) {

        return () -> {
            KafkaListenerEndpointRegistrar registrar = bpp.getEndpointRegistrar();
            MethodKafkaListenerEndpoint<String, String> mainEndpoint = new MethodKafkaListenerEndpoint<>();
            EndpointProcessor endpointProcessor = endpoint -> {
                // customize as needed (e.g. apply attributes to retry endpoints).
                if (!endpoint.equals(mainEndpoint)) {
                // these are required
            try {
                mainEndpoint.setMethod(Listener.class.getDeclaredMethod("onMessage", ConsumerRecord.class));
            catch (NoSuchMethodException | SecurityException ex) {
                throw new IllegalStateException(ex);
            configurer.processMainAndRetryListeners(endpointProcessor, mainEndpoint, config, registrar, factory,

    ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("topic", "test");


class Listener implements MessageListener<String, String> {

    public void onMessage(ConsumerRecord<String, String> record) {
        throw new RuntimeException("test");

Auto creation of topics will only occur if the configuration is processed before the application context is refreshed, as in the above example. To configure containers at runtime, the topics will need to be created using some other technique.
Auto creation of topics will only occur if the configuration is processed before the application context is refreshed, as in the above example. To configure containers at runtime, the topics will need to be created using some other technique.