For the latest stable version, please use Spring for Apache Kafka 3.2.4! |
For the latest stable version, please use Spring for Apache Kafka 3.2.4! |
If message processing fails, the message is forwarded to a retry topic with a back off timestamp. The retry topic consumer then checks the timestamp and if it’s not due it pauses the consumption for that topic’s partition. When it is due the partition consumption is resumed, and the message is consumed again. If the message processing fails again the message will be forwarded to the next retry topic, and the pattern is repeated until a successful processing occurs, or the attempts are exhausted, and the message is sent to the Dead Letter Topic (if configured).
To illustrate, if you have a "main-topic" topic, and want to set up non-blocking retry with an exponential backoff of 1000ms with a multiplier of 2 and 4 max attempts, it will create the main-topic-retry-1000, main-topic-retry-2000, main-topic-retry-4000 and main-topic-dlt topics and configure the respective consumers. The framework also takes care of creating the topics and setting up and configuring the listeners.
By using this strategy you lose Kafka’s ordering guarantees for that topic. |
By using this strategy you lose Kafka’s ordering guarantees for that topic. |
You can set the AckMode mode you prefer, but RECORD is suggested.
|
You can set the AckMode mode you prefer, but RECORD is suggested.
|
At this time this functionality doesn’t support class level @KafkaListener annotations.
|
At this time this functionality doesn’t support class level @KafkaListener annotations.
|
When using a manual AckMode
with asyncAcks
set to true, the DefaultErrorHandler
must be configured with seekAfterError
set to false
.
Starting with versions 2.9.10, 3.0.8, this will be set to true unconditionally for such configurations.
With earlier versions, it was necessary to override the RetryConfigurationSupport.configureCustomizers()
method to set the property to true
.
@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
customizersConfigurer.customizeErrorHandler(eh -> eh.setSeekAfterError(false));
}
In addition, before those versions, using the default (logging) DLT handler was not compatible with any kind of manual AckMode
, regardless of the asyncAcks
property.