This version is still in development and is not considered stable yet. For the latest stable version, please use Spring AMQP 3.2.0! |
Resilience: Recovering from Errors and Broker Failures
Some of the key (and most popular) high-level features that Spring AMQP provides are to do with recovery and automatic re-connection in the event of a protocol error or broker failure. We have seen all the relevant components already in this guide, but it should help to bring them all together here and call out the features and recovery scenarios individually.
The primary reconnection features are enabled by the CachingConnectionFactory
itself.
It is also often beneficial to use the RabbitAdmin
auto-declaration features.
In addition, if you care about guaranteed delivery, you probably also need to use the channelTransacted
flag in RabbitTemplate
and SimpleMessageListenerContainer
and the AcknowledgeMode.AUTO
(or manual if you do the acks yourself) in the SimpleMessageListenerContainer
.
Automatic Declaration of Exchanges, Queues, and Bindings
The RabbitAdmin
component can declare exchanges, queues, and bindings on startup.
It does this lazily, through a ConnectionListener
.
Consequently, if the broker is not present on startup, it does not matter.
The first time a Connection
is used (for example,
by sending a message) the listener fires and the admin features is applied.
A further benefit of doing the auto declarations in a listener is that, if the connection is dropped for any reason (for example,
broker death, network glitch, and others), they are applied again when the connection is re-established.
Queues declared this way must have fixed names — either explicitly declared or generated by the framework for AnonymousQueue instances.
Anonymous queues are non-durable, exclusive, and auto-deleting.
|
Automatic declaration is performed only when the CachingConnectionFactory cache mode is CHANNEL (the default).
This limitation exists because exclusive and auto-delete queues are bound to the connection.
|
Starting with version 2.2.2, the RabbitAdmin
will detect beans of type DeclarableCustomizer
and apply the function before actually processing the declaration.
This is useful, for example, to set a new argument (property) before it has first class support within the framework.
@Bean
public DeclarableCustomizer customizer() {
return dec -> {
if (dec instanceof Queue && ((Queue) dec).getName().equals("my.queue")) {
dec.addArgument("some.new.queue.argument", true);
}
return dec;
};
}
It is also useful in projects that don’t provide direct access to the Declarable
bean definitions.
Failures in Synchronous Operations and Options for Retry
If you lose your connection to the broker in a synchronous sequence when using RabbitTemplate
(for instance), Spring AMQP throws an AmqpException
(usually, but not always, AmqpIOException
).
We do not try to hide the fact that there was a problem, so you have to be able to catch and respond to the exception.
The easiest thing to do if you suspect that the connection was lost (and it was not your fault) is to try the operation again.
You can do this manually, or you could look at using Spring Retry to handle the retry (imperatively or declaratively).
Spring Retry provides a couple of AOP interceptors and a great deal of flexibility to specify the parameters of the retry (number of attempts, exception types, backoff algorithm, and others).
Spring AMQP also provides some convenience factory beans for creating Spring Retry interceptors in a convenient form for AMQP use cases, with strongly typed callback interfaces that you can use to implement custom recovery logic.
See the Javadoc and properties of StatefulRetryOperationsInterceptor
and StatelessRetryOperationsInterceptor
for more detail.
Stateless retry is appropriate if there is no transaction or if a transaction is started inside the retry callback.
Note that stateless retry is simpler to configure and analyze than stateful retry, but it is not usually appropriate if there is an ongoing transaction that must be rolled back or definitely is going to roll back.
A dropped connection in the middle of a transaction should have the same effect as a rollback.
Consequently, for reconnections where the transaction is started higher up the stack, stateful retry is usually the best choice.
Stateful retry needs a mechanism to uniquely identify a message.
The simplest approach is to have the sender put a unique value in the MessageId
message property.
The provided message converters provide an option to do this: you can set createMessageIds
to true
.
Otherwise, you can inject a MessageKeyGenerator
implementation into the interceptor.
The key generator must return a unique key for each message.
In versions prior to version 2.0, a MissingMessageIdAdvice
was provided.
It enabled messages without a messageId
property to be retried exactly once (ignoring the retry settings).
This advice is no longer provided, since, along with spring-retry
version 1.2, its functionality is built into the interceptor and message listener containers.
For backwards compatibility, a message with a null message ID is considered fatal for the consumer (consumer is stopped) by default (after one retry).
To replicate the functionality provided by the MissingMessageIdAdvice , you can set the statefulRetryFatalWithNullMessageId property to false on the listener container.
With that setting, the consumer continues to run and the message is rejected (after one retry).
It is discarded or routed to the dead letter queue (if one is configured).
|
Starting with version 1.3, a builder API is provided to aid in assembling these interceptors by using Java (in @Configuration
classes).
The following example shows how to do so:
@Bean
public StatefulRetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateful()
.maxAttempts(5)
.backOffOptions(1000, 2.0, 10000) // initialInterval, multiplier, maxInterval
.build();
}
Only a subset of retry capabilities can be configured this way.
More advanced features would need the configuration of a RetryTemplate
as a Spring bean.
See the Spring Retry Javadoc for complete information about available policies and their configuration.
Retry with Batch Listeners
It is not recommended to configure retry with a batch listener, unless the batch was created by the producer, in a single record. See Batched Messages for information about consumer and producer-created batches. With a consumer-created batch, the framework has no knowledge about which message in the batch caused the failure so recovery after the retries are exhausted is not possible. With producer-created batches, since there is only one message that actually failed, the whole message can be recovered. Applications may want to inform a custom recoverer where in the batch the failure occurred, perhaps by setting an index property of the thrown exception.
A retry recoverer for a batch listener must implement MessageBatchRecoverer
.
Message Listeners and the Asynchronous Case
If a MessageListener
fails because of a business exception, the exception is handled by the message listener container, which then goes back to listening for another message.
If the failure is caused by a dropped connection (not a business exception), the consumer that is collecting messages for the listener has to be cancelled and restarted.
The SimpleMessageListenerContainer
handles this seamlessly, and it leaves a log to say that the listener is being restarted.
In fact, it loops endlessly, trying to restart the consumer.
Only if the consumer is very badly behaved indeed will it give up.
One side effect is that if the broker is down when the container starts, it keeps trying until a connection can be established.
Business exception handling, as opposed to protocol errors and dropped connections, might need more thought and some custom configuration, especially if transactions or container acks are in use.
Prior to 2.8.x, RabbitMQ had no definition of dead letter behavior.
Consequently, by default, a message that is rejected or rolled back because of a business exception can be redelivered endlessly.
To put a limit on the client on the number of re-deliveries, one choice is a StatefulRetryOperationsInterceptor
in the advice chain of the listener.
The interceptor can have a recovery callback that implements a custom dead letter action — whatever is appropriate for your particular environment.
Another alternative is to set the container’s defaultRequeueRejected
property to false
.
This causes all failed messages to be discarded.
When using RabbitMQ 2.8.x or higher, this also facilitates delivering the message to a dead letter exchange.
Alternatively, you can throw a AmqpRejectAndDontRequeueException
.
Doing so prevents message requeuing, regardless of the setting of the defaultRequeueRejected
property.
Starting with version 2.1, an ImmediateRequeueAmqpException
is introduced to perform exactly the opposite logic: the message will be requeued, regardless of the setting of the defaultRequeueRejected
property.
Often, a combination of both techniques is used.
You can use a StatefulRetryOperationsInterceptor
in the advice chain with a MessageRecoverer
that throws an AmqpRejectAndDontRequeueException
.
The MessageRecover
is called when all retries have been exhausted.
The RejectAndDontRequeueRecoverer
does exactly that.
The default MessageRecoverer
consumes the errant message and emits a WARN
message.
Starting with version 1.3, a new RepublishMessageRecoverer
is provided, to allow publishing of failed messages after retries are exhausted.
When a recoverer consumes the final exception, the message is ack’d and is not sent to the dead letter exchange by the broker, if configured.
When RepublishMessageRecoverer is used on the consumer side, the received message has deliveryMode in the receivedDeliveryMode message property.
In this case the deliveryMode is null .
That means a NON_PERSISTENT delivery mode on the broker.
Starting with version 2.0, you can configure the RepublishMessageRecoverer for the deliveryMode to set into the message to republish if it is null .
By default, it uses MessageProperties default value - MessageDeliveryMode.PERSISTENT .
|
The following example shows how to set a RepublishMessageRecoverer
as the recoverer:
@Bean
RetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateless()
.maxAttempts(5)
.recoverer(new RepublishMessageRecoverer(amqpTemplate(), "something", "somethingelse"))
.build();
}
The RepublishMessageRecoverer
publishes the message with additional information in message headers, such as the exception message, stack trace, original exchange, and routing key.
Additional headers can be added by creating a subclass and overriding additionalHeaders()
.
The deliveryMode
(or any other properties) can also be changed in the additionalHeaders()
, as the following example shows:
RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(amqpTemplate, "error") {
protected Map<? extends String, ? extends Object> additionalHeaders(Message message, Throwable cause) {
message.getMessageProperties()
.setDeliveryMode(message.getMessageProperties().getReceivedDeliveryMode());
return null;
}
};
Starting with version 2.0.5, the stack trace may be truncated if it is too large; this is because all headers have to fit in a single frame.
By default, if the stack trace would cause less than 20,000 bytes ('headroom') to be available for other headers, it will be truncated.
This can be adjusted by setting the recoverer’s frameMaxHeadroom
property, if you need more or less space for other headers.
Starting with versions 2.1.13, 2.2.3, the exception message is included in this calculation, and the amount of stack trace will be maximized using the following algorithm:
-
if the stack trace alone would exceed the limit, the exception message header will be truncated to 97 bytes plus
…
and the stack trace is truncated too. -
if the stack trace is small, the message will be truncated (plus
…
) to fit in the available bytes (but the message within the stack trace itself is truncated to 97 bytes plus…
).
Whenever a truncation of any kind occurs, the original exception will be logged to retain the complete information. The evaluation is performed after the headers are enhanced so information such as the exception type can be used in the expressions.
Starting with version 2.4.8, the error exchange and routing key can be provided as SpEL expressions, with the Message
being the root object for the evaluation.
Starting with version 2.3.3, a new subclass RepublishMessageRecovererWithConfirms
is provided; this supports both styles of publisher confirms and will wait for the confirmation before returning (or throw an exception if not confirmed or the message is returned).
If the confirm type is CORRELATED
, the subclass will also detect if a message is returned and throw an AmqpMessageReturnedException
; if the publication is negatively acknowledged, it will throw an AmqpNackReceivedException
.
If the confirm type is SIMPLE
, the subclass will invoke the waitForConfirmsOrDie
method on the channel.
See Publisher Confirms and Returns for more information about confirms and returns.
Starting with version 2.1, an ImmediateRequeueMessageRecoverer
is added to throw an ImmediateRequeueAmqpException
, which notifies a listener container to requeue the current failed message.
Exception Classification for Spring Retry
Spring Retry has a great deal of flexibility for determining which exceptions can invoke retry.
The default configuration retries for all exceptions.
Given that user exceptions are wrapped in a ListenerExecutionFailedException
, we need to ensure that the classification examines the exception causes.
The default classifier looks only at the top level exception.
Since Spring Retry 1.0.3, the BinaryExceptionClassifier
has a property called traverseCauses
(default: false
).
When true
, it travers exception causes until it finds a match or there is no cause.
To use this classifier for retry, you can use a SimpleRetryPolicy
created with the constructor that takes the max attempts, the Map
of Exception
instances, and the boolean (traverseCauses
) and inject this policy into the RetryTemplate
.
Retry Over Broker
The message dead-lettered from the queue can be republished back to this queue after re-routing from a DLX.
This retry behaviour is controlled on the broker side via an x-death
header.
More information about this approach in the official RabbitMQ documentation.
The other approach is to re-publish failed message back to the original exchange manually from the application.
Starting with version 4.0
, the RabbitMQ broker does not consider x-death
header sent from the client.
Essentially, any x-*
headers are ignored from the client.
To mitigate this new behavior of the RabbitMQ broker, Spring AMQP has introduced a retry_count
header starting with version 3.2.
When this header is absent and a server side DLX is in action, the x-death.count
property is mapped to this header.
When the failed message is re-published manually for retries, the retry_count
header value has to be incremented manually.
See MessageProperties.incrementRetryCount()
JavaDocs for more information.
The following example summarise an algorithm for manual retry over the broker:
@RabbitListener(queueNames = "some_queue")
public void rePublish(Message message) {
try {
// Process message
}
catch (Exception ex) {
Long retryCount = message.getMessageProperties().getRetryCount();
if (retryCount < 3) {
message.getMessageProperties().incrementRetryCount();
this.rabbitTemplate.send("", "some_queue", message);
}
else {
throw new ImmediateAcknowledgeAmqpException("Failed after 4 attempts");
}
}
}