The Spring Rabbit framework has support for automatic transaction management in the synchronous and asynchronous use cases with a number of different semantics that can be selected declaratively, as is familiar to existing users of Spring transactions. This makes many if not most common messaging patterns easy to implement.
There are two ways to signal the desired transaction semantics to the framework.
In both the RabbitTemplate
and SimpleMessageListenerContainer
, there is a flag channelTransacted
which, if true
, tells the framework to use a transactional channel and to end all operations (send or receive) with a commit or rollback (depending on the outcome), with an exception signaling a rollback.
Another signal is to provide an external transaction with one of Spring’s PlatformTransactionManager
implementations as a context for the ongoing operation.
If there is already a transaction in progress when the framework is sending or receiving a message, and the channelTransacted
flag is true
, the commit or rollback of the messaging transaction is deferred until the end of the current transaction.
If the channelTransacted
flag is false
, no transaction semantics apply to the messaging operation (it is auto-acked).
The channelTransacted
flag is a configuration time setting.
It is declared and processed once when the AMQP components are created, usually at application startup.
The external transaction is more dynamic in principle because the system responds to the current thread state at runtime.
However, in practice, it is often also a configuration setting, when the transactions are layered onto an application declaratively.
For synchronous use cases with RabbitTemplate
, the external transaction is provided by the caller, either declaratively or imperatively according to taste (the usual Spring transaction model).
The following example shows a declarative approach (usually preferred because it is non-invasive), where the template has been configured with channelTransacted=true
:
@Transactional
public void doSomething() {
String incoming = rabbitTemplate.receiveAndConvert();
// do some more database processing...
String outgoing = processInDatabaseAndExtractReply(incoming);
rabbitTemplate.convertAndSend(outgoing);
}
In the preceding example, a String
payload is received, converted, and sent as a message body inside a method marked as @Transactional
.
If the database processing fails with an exception, the incoming message is returned to the broker, and the outgoing message is not sent.
This applies to any operations with the RabbitTemplate
inside a chain of transactional methods (unless, for instance, the Channel
is directly manipulated to commit the transaction early).
For asynchronous use cases with SimpleMessageListenerContainer
, if an external transaction is needed, it has to be requested by the container when it sets up the listener.
To signal that an external transaction is required, the user provides an implementation of PlatformTransactionManager
to the container when it is configured.
The following example shows how to do so:
@Configuration
public class ExampleExternalTransactionAmqpConfiguration {
@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setTransactionManager(transactionManager());
container.setChannelTransacted(true);
container.setQueueName("some.queue");
container.setMessageListener(exampleListener());
return container;
}
}
In the preceding example, the transaction manager is added as a dependency injected from another bean definition (not shown), and the channelTransacted
flag is also set to true
.
The effect is that if the listener fails with an exception, the transaction is rolled back, and the message is also returned to the broker.
Significantly, if the transaction fails to commit (for example, because of
a database constraint error or connectivity problem), the AMQP transaction is also rolled back, and the message is returned to the broker.
This is sometimes known as a “Best Efforts 1 Phase Commit”, and is a very powerful pattern for reliable messaging.
If the channelTransacted
flag was set to false
(the default) in the preceding example, the external transaction would still be provided for the listener, but all messaging operations would be auto-acked, so the effect is to commit the messaging operations even on a rollback of the business operation.
Conditional Rollback
Prior to version 1.6.6, adding a rollback rule to a container’s transactionAttribute
when using an external transaction manager (such as JDBC) had no effect.
Exceptions always rolled back the transaction.
Also, when using a transaction advice in the container’s advice chain, conditional rollback was not very useful, because all listener exceptions are wrapped in a ListenerExecutionFailedException
.
The first problem has been corrected, and the rules are now applied properly.
Further, the ListenerFailedRuleBasedTransactionAttribute
is now provided.
It is a subclass of RuleBasedTransactionAttribute
, with the only difference being that it is aware of the ListenerExecutionFailedException
and uses the cause of such exceptions for the rule.
This transaction attribute can be used directly in the container or through a transaction advice.
The following example uses this rule:
@Bean
public AbstractMessageListenerContainer container() {
...
container.setTransactionManager(transactionManager);
RuleBasedTransactionAttribute transactionAttribute =
new ListenerFailedRuleBasedTransactionAttribute();
transactionAttribute.setRollbackRules(Collections.singletonList(
new NoRollbackRuleAttribute(DontRollBackException.class)));
container.setTransactionAttribute(transactionAttribute);
...
}
A note on Rollback of Received Messages
AMQP transactions apply only to messages and acks sent to the broker.
Consequently, when there is a rollback of a Spring transaction and a message has been received, Spring AMQP has to not only rollback the transaction but also manually reject the message (sort of a nack, but that is not what the specification calls it).
The action taken on message rejection is independent of transactions and depends on the defaultRequeueRejected
property (default: true
).
For more information about rejecting failed messages, see Message Listeners and the Asynchronous Case.
For more information about RabbitMQ transactions and their limitations, see RabbitMQ Broker Semantics.
Prior to RabbitMQ 2.7.0, such messages (and any that are unacked when a channel is closed or aborts) went to the back of the queue on a Rabbit broker. Since 2.7.0, rejected messages go to the front of the queue, in a similar manner to JMS rolled back messages. |
Previously, message requeue on transaction rollback was inconsistent between local transactions and when a TransactionManager was provided.
In the former case, the normal requeue logic (AmqpRejectAndDontRequeueException or defaultRequeueRejected=false ) applied (see Message Listeners and the Asynchronous Case).
With a transaction manager, the message was unconditionally requeued on rollback.
Starting with version 2.0, the behavior is consistent and the normal requeue logic is applied in both cases.
To revert to the previous behavior, you can set the container’s alwaysRequeueWithTxManagerRollback property to true .
See Message Listener Container Configuration.
|
Prior to RabbitMQ 2.7.0, such messages (and any that are unacked when a channel is closed or aborts) went to the back of the queue on a Rabbit broker. Since 2.7.0, rejected messages go to the front of the queue, in a similar manner to JMS rolled back messages. |
Previously, message requeue on transaction rollback was inconsistent between local transactions and when a TransactionManager was provided.
In the former case, the normal requeue logic (AmqpRejectAndDontRequeueException or defaultRequeueRejected=false ) applied (see Message Listeners and the Asynchronous Case).
With a transaction manager, the message was unconditionally requeued on rollback.
Starting with version 2.0, the behavior is consistent and the normal requeue logic is applied in both cases.
To revert to the previous behavior, you can set the container’s alwaysRequeueWithTxManagerRollback property to true .
See Message Listener Container Configuration.
|
Using RabbitTransactionManager
The RabbitTransactionManager is an alternative to executing Rabbit operations within, and synchronized with, external transactions.
This transaction manager is an implementation of the PlatformTransactionManager
interface and should be used with a single Rabbit ConnectionFactory
.
This strategy is not able to provide XA transactions — for example, in order to share transactions between messaging and database access. |
Application code is required to retrieve the transactional Rabbit resources through ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactory, boolean)
instead of a standard Connection.createChannel()
call with subsequent channel creation.
When using Spring AMQP’s RabbitTemplate, it will autodetect a thread-bound Channel and automatically participate in its transaction.
With Java Configuration, you can setup a new RabbitTransactionManager by using the following bean:
@Bean
public RabbitTransactionManager rabbitTransactionManager() {
return new RabbitTransactionManager(connectionFactory);
}
If you prefer XML configuration, you can declare the following bean in your XML Application Context file:
<bean id="rabbitTxManager"
class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
This strategy is not able to provide XA transactions — for example, in order to share transactions between messaging and database access. |
Transaction Synchronization
Synchronizing a RabbitMQ transaction with some other (e.g. DBMS) transaction provides "Best Effort One Phase Commit" semantics.
It is possible that the RabbitMQ transaction fails to commit during the after completion phase of transaction synchronization.
This is logged by the spring-tx
infrastructure as an error, but no exception is thrown to the calling code.
Starting with version 2.3.10, you can call ConnectionUtils.checkAfterCompletion()
after the transaction has committed on the same thread that processed the transaction.
It will simply return if no exception occurred; otherwise it will throw an AfterCompletionFailedException
which will have a property representing the synchronization status of the completion.
Enable this feature by calling ConnectionFactoryUtils.enableAfterCompletionFailureCapture(true)
; this is a global flag and applies to all threads.