This version is still in development and is not considered stable yet. For the latest stable version, please use Spring for Apache Kafka 3.2.4!spring-doc.cn

This version is still in development and is not considered stable yet. For the latest stable version, please use Spring for Apache Kafka 3.2.4!spring-doc.cn

The feature is designed to be used with @KafkaListener; however, several users have requested information on how to configure non-blocking retries programmatically. The following Spring Boot application provides an example of how to do so.spring-doc.cn

@SpringBootApplication
public class Application extends RetryTopicConfigurationSupport {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    RetryTopicConfiguration retryConfig(KafkaTemplate<String, String> template) {
        return RetryTopicConfigurationBuilder.newInstance()
                .maxAttempts(4)
                .autoCreateTopicsWith(2, (short) 1)
                .create(template);
    }

    @Bean
    TaskScheduler scheduler() {
        return new ThreadPoolTaskScheduler();
    }

    @Bean
    @Order(0)
    SmartInitializingSingleton dynamicRetry(RetryTopicConfigurer configurer, RetryTopicConfiguration config,
            KafkaListenerAnnotationBeanPostProcessor<?, ?> bpp, KafkaListenerContainerFactory<?> factory,
            Listener listener, KafkaListenerEndpointRegistry registry) {

        return () -> {
            KafkaListenerEndpointRegistrar registrar = bpp.getEndpointRegistrar();
            MethodKafkaListenerEndpoint<String, String> mainEndpoint = new MethodKafkaListenerEndpoint<>();
            EndpointProcessor endpointProcessor = endpoint -> {
                // customize as needed (e.g. apply attributes to retry endpoints).
                if (!endpoint.equals(mainEndpoint)) {
                    endpoint.setConcurrency(1);
                }
                // these are required
                endpoint.setMessageHandlerMethodFactory(bpp.getMessageHandlerMethodFactory());
                endpoint.setTopics("topic");
                endpoint.setId("id");
                endpoint.setGroupId("group");
            };
            mainEndpoint.setBean(listener);
            try {
                mainEndpoint.setMethod(Listener.class.getDeclaredMethod("onMessage", ConsumerRecord.class));
            }
            catch (NoSuchMethodException | SecurityException ex) {
                throw new IllegalStateException(ex);
            }
            mainEndpoint.setConcurrency(2);
            mainEndpoint.setTopics("topic");
            mainEndpoint.setId("id");
            mainEndpoint.setGroupId("group");
            configurer.processMainAndRetryListeners(endpointProcessor, mainEndpoint, config, registrar, factory,
                    "kafkaListenerContainerFactory");
        };
    }


    @Bean
    ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("topic", "test");
        };
    }

}

@Component
class Listener implements MessageListener<String, String> {

    @Override
    public void onMessage(ConsumerRecord<String, String> record) {
        System.out.println(KafkaUtils.format(record));
        throw new RuntimeException("test");
    }

}
Auto creation of topics will only occur if the configuration is processed before the application context is refreshed, as in the above example. To configure containers at runtime, the topics will need to be created using some other technique.
Auto creation of topics will only occur if the configuration is processed before the application context is refreshed, as in the above example. To configure containers at runtime, the topics will need to be created using some other technique.