This chapter covers Spring Integration’s support for transactions. It covers the following topics:
Understanding Transactions in Message flows
Spring Integration exposes several hooks to address the transactional needs of your message flows. To better understand these hooks and how you can benefit from them, we must first revisit the six mechanisms that you can use to initiate message flows and see how you can address the transactional needs of these flows within each of these mechanisms.
The following six mechanisms initiate a message flow (details for each are provided throughout this manual):
-
Gateway proxy: A basic messaging gateway.
-
Message channel: Direct interactions with
MessageChannel
methods (for example,channel.send(message)
). -
Message publisher: The way to initiate a message flow as the by-product of method invocations on Spring beans.
-
Inbound channel adapters and gateways: The way to initiate a message flow based on connecting third-party system with the Spring Integration messaging system (for example,
[JmsMessage] → Jms Inbound Adapter[SI Message] → SI Channel
). -
Scheduler: The way to initiate a message flow based on scheduling events distributed by a pre-configured scheduler.
-
Poller: Similar to the scheduler, this is the way to initiate message flow based on scheduling or interval-based events distributed by a pre-configured poller.
We can split these six mechanisms into two general categories:
-
Message flows initiated by a user process: Example scenarios in this category would be invoking a gateway method or explicitly sending a
Message
to aMessageChannel
. In other words, these message flows depend on a third party process (such as some code that you wrote) to be initiated. -
Message flows initiated by a daemon process: Example scenarios in this category include a Poller polling a message queue to initiate a new message flow with the polled message or a scheduler scheduling the process by creating a new message and initiating a message flow at a predefined time.
Clearly the gateway proxy, MessageChannel.send(…)
and MessagePublisher
all belong to the first category, and inbound adapters and gateways, scheduler, and poller belong to the second category.
So, how can you address transactional needs in various scenarios within each category, and is there a need for Spring Integration to provide something explicit with regard to transactions for a particular scenario? Or can you use Spring’s transaction support instead?
Spring itself provides first class support for transaction management. So our goal here is not to provide something new but rather use Spring to benefit from its existing support for transactions. In other words, as a framework, we must expose hooks to Spring’s transaction management functionality. However, since Spring Integration configuration is based on Spring configuration, we need not always expose these hooks, because Spring already exposes them . After all, every Spring Integration component is a Spring Bean.
With this goal in mind, we can again consider the two scenarios: message flows initiated by a user process and message flows initiated by a daemon.
Message flows that are initiated by a user process and configured in a Spring application context are subject to the usual transactional configuration of such processes.
Therefore, they need not be explicitly configured by Spring Integration to support transactions.
The transaction could and should be initiated through Spring’s standard transaction support.
The Spring Integration message flow naturally honors the transactional semantics of the components, because it is itself configured by Spring.
For example, a gateway or service activator method could be annotated with @Transactional
, or a TransactionInterceptor
could be defined in an XML configuration with a pointcut expression that points to specific methods that should be transactional.
The bottom line is that you have full control over transaction configuration and boundaries in these scenarios.
However, things are a bit different when it comes to message flows initiated by a daemon process. Although configured by the developer, these flows do not directly involve a human or some other process to be initiated. These are trigger-based flows that are initiated by a trigger process (a daemon process) based on the configuration of the process. For example, we could have a scheduler initiate a message flow every Friday night. We can also configure a trigger that initiates a message flow every second and so on. As a result, we need a way to let these trigger-based processes know of our intention to make the resulting message flows be transactional, so that a Transaction context can be created whenever a new message flow is initiated. In other words, we need to expose some transaction configuration, but only enough to delegate to the transaction support already provided by Spring (as we do in other scenarios).
Poller Transaction Support
Spring Integration provides transactional support for pollers.
Pollers are a special type of component because, within a poller task, we can call receive()
against a resource that is itself transactional, thus including the receive()
call in the boundaries of the transaction, which lets it be rolled back in case of a task failure.
If we were to add the same support for channels, the added transactions would affect all downstream components starting with the send()
call.
That provides a rather wide scope for transaction demarcation without any strong reason, especially when Spring already provides several ways to address the transactional needs of any component downstream.
However, the receive()
method being included in a transaction boundary is the “strong reason” for pollers.
Any time you configure a Poller, you can provide transactional configuration by using the transactional
child element and its attributes,as the following example shows:
<int:poller max-messages-per-poll="1" fixed-rate="1000">
<transactional transaction-manager="txManager"
isolation="DEFAULT"
propagation="REQUIRED"
read-only="true"
timeout="1000"/>
</poller>
The preceding configuration looks similar to a native Spring transaction configuration.
You must still provide a reference to a transaction manager and either specify transaction attributes or rely on defaults (for example, if the 'transaction-manager' attribute is not specified, it defaults to the bean named 'transactionManager').
Internally, the process is wrapped in Spring’s native transaction, where TransactionInterceptor
is responsible for handling transactions.
For more information on how to configure a transaction manager, the types of transaction managers (such as JTA, Datasource, and others), and other details related to transaction configuration, see the Spring Framework Reference Guide.
With the preceding configuration, all message flows initiated by this poller are transactional. For more information and details on a poller’s transactional configuration, see Polling and Transactions.
Along with transactions, you might need to address several more cross-cutting concerns when you run a poller.
To help with that, the poller element accepts an <advice-chain>
child element, which lets you define a custom chain of advice instances to be applied on the Poller.
(See Pollable Message Source for more details.)
In Spring Integration 2.0, the Poller went through a refactoring effort and now uses a proxy mechanism to address transactional concerns as well as other cross-cutting concerns.
One of the significant changes evolving from this effort is that we made the <transactional>
and <advice-chain>
elements be mutually exclusive.
The rationale behind this is that, if you need more than one advice and one of them is Transaction advice, you can include it in the <advice-chain>
with the same convenience as before but with much more control, since you now have an option to position the advice in the desired order.
The following example shows how to do so:
<int:poller max-messages-per-poll="1" fixed-rate="10000">
<advice-chain>
<ref bean="txAdvice"/>
<ref bean="someOtherAdviceBean" />
<beans:bean class="foo.bar.SampleAdvice"/>
</advice-chain>
</poller>
<tx:advice id="txAdvice" transaction-manager="txManager">
<tx:attributes>
<tx:method name="get*" read-only="true"/>
<tx:method name="*"/>
</tx:attributes>
</tx:advice>
The preceding example shows a basic XML-based configuration of Spring Transaction advice (txAdvice
) and included it within the <advice-chain>
defined by the Poller.
If you need to address only the transactional concerns of the poller, you can still use the <transactional>
element as a convenience.
Transaction Boundaries
Another important factor is the boundaries of Transactions within a Message flow. When a transaction is started, the transaction context is bound to the current thread. So regardless of how many endpoints and channels you have in your Message flow your transaction context will be preserved as long as you are ensuring that the flow continues on the same thread. As soon as you break it by introducing a Pollable Channel or Executor Channel or initiate a new thread manually in some service, the Transactional boundary will be broken as well. Essentially the Transaction will END right there, and if a successful handoff has transpired between the threads, the flow would be considered a success and a COMMIT signal would be sent even though the flow will continue and might still result in an Exception somewhere downstream. If such a flow were synchronous, that Exception could be thrown back to the initiator of the Message flow who is also the initiator of the transactional context and the transaction would result in a ROLLBACK. The middle ground is to use transactional channels at any point where a thread boundary is being broken. For example, you can use a Queue-backed Channel that delegates to a transactional MessageStore strategy, or you could use a JMS-backed channel.
Transaction Synchronization
In some environments, it helps to synchronize operations with a transaction that encompasses the entire flow.
For example, consider a <file:inbound-channel-adapter/>
at the start of a flow that performs a number of database updates.
If the transaction commits, we might want to move the file to a success
directory, while we might want to move it to a failure
directory if the transaction rolls back.
Spring Integration 2.2 introduced the capability of synchronizing these operations with a transaction.
In addition, you can configure a PseudoTransactionManager
if you do not have a 'real' transaction but still want to perform different actions on success or failure.
For more information, see Pseudo Transactions.
The following listing shows the key strategy interfaces for this feature:
public interface TransactionSynchronizationFactory {
TransactionSynchronization create(Object key);
}
public interface TransactionSynchronizationProcessor {
void processBeforeCommit(IntegrationResourceHolder holder);
void processAfterCommit(IntegrationResourceHolder holder);
void processAfterRollback(IntegrationResourceHolder holder);
}
The factory is responsible for creating a TransactionSynchronization
object.
You can implement your own or use the one provided by the framework: DefaultTransactionSynchronizationFactory
.
This implementation returns a TransactionSynchronization
that delegates to a default implementation of TransactionSynchronizationProcessor
: ExpressionEvaluatingTransactionSynchronizationProcessor
.
This processor supports three SpEL expressions: beforeCommitExpression
, afterCommitExpression
, and afterRollbackExpression
.
These actions should be self-explanatory to those familiar with transactions.
In each case, the #root
variable is the original Message
.
In some cases, other SpEL variables are made available, depending on the MessageSource
being polled by the poller.
For example, the MongoDbMessageSource
provides the #mongoTemplate
variable, which references the message source’s MongoTemplate
.
Similarly, the RedisStoreMessageSource
provides the #store
variable, which references the RedisStore
created by the poll.
To enable the feature for a particular poller, you can provide a reference to the TransactionSynchronizationFactory
on the poller’s <transactional/>
element by using the synchronization-factory
attribute.
Starting with version 5.0, Spring Integration provides PassThroughTransactionSynchronizationFactory
, which is applied by default to polling endpoints when no TransactionSynchronizationFactory
is configured but an advice of type TransactionInterceptor
exists in the advice chain.
When using any out-of-the-box TransactionSynchronizationFactory
implementation, polling endpoints bind a polled message to the current transactional context and provide it as a failedMessage
in a MessagingException
if an exception is thrown after the transaction advice.
When using a custom transaction advice that does not implement TransactionInterceptor
, you can explicitly configure a PassThroughTransactionSynchronizationFactory
to achieve this behavior.
In either case, the MessagingException
becomes the payload of the ErrorMessage
that is sent to the errorChannel
, and the cause is the raw exception thrown by the advice.
Previously, the ErrorMessage
had a payload that was the raw exception thrown by the advice and did not provide a reference to the failedMessage
information, making it difficult to determine the reasons for the transaction commit problem.
To simplify configuration of these components, Spring Integration provides namespace support for the default factory. The following example shows how to use the namespace to configure a file inbound channel adapter:
<int-file:inbound-channel-adapter id="inputDirPoller"
channel="someChannel"
directory="/foo/bar"
filter="filter"
comparator="testComparator">
<int:poller fixed-rate="5000">
<int:transactional transaction-manager="transactionManager" synchronization-factory="syncFactory" />
</int:poller>
</int-file:inbound-channel-adapter>
<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit expression="payload.renameTo(new java.io.File('/success/' + payload.name))"
channel="committedChannel" />
<int:after-rollback expression="payload.renameTo(new java.io.File('/failed/' + payload.name))"
channel="rolledBackChannel" />
</int:transaction-synchronization-factory>
The result of the SpEL evaluation is sent as the payload to either committedChannel
or rolledBackChannel
(in this case, this would be Boolean.TRUE
or Boolean.FALSE
— the result of the java.io.File.renameTo()
method call).
If you wish to send the entire payload for further Spring Integration processing, use the 'payload' expression.
It is important to understand that this synchronizes the actions with a transaction. It does not make a resource that is not inherently transactional actually be transactional. Instead, the transaction (be it JDBC or otherwise) is started before the poll and either committed or rolled back when the flow completes, followed by the synchronized action. If you provide a custom |
In addition to the after-commit
and after-rollback
expressions, before-commit
is also supported.
In that case, if the evaluation (or downstream processing) throws an exception, the transaction is rolled back instead of being committed.
It is important to understand that this synchronizes the actions with a transaction. It does not make a resource that is not inherently transactional actually be transactional. Instead, the transaction (be it JDBC or otherwise) is started before the poll and either committed or rolled back when the flow completes, followed by the synchronized action. If you provide a custom |
Pseudo Transactions
After reading the Transaction Synchronization section, you might think it would be useful to take these 'success' or 'failure' actions when a flow completes, even if there is no “real” transactional resources (such as JDBC) downstream of the poller. For example, consider a “<file:inbound-channel-adapter/>” followed by an “<ftp:outbout-channel-adapter/>”. Neither of these components is transactional, but we might want to move the input file to different directories, based on the success or failure of the FTP transfer.
To provide this functionality, the framework provides a PseudoTransactionManager
, enabling the above configuration even when there is no real transactional resource involved.
If the flow completes normally, the beforeCommit
and afterCommit
synchronizations are called.
On failure, the afterRollback
synchronization is called.
Because it is not a real transaction, no actual commit or rollback occurs.
The pseudo transaction is a vehicle used to enable the synchronization features.
To use a PseudoTransactionManager
, you can define it as a <bean/>, in the same way you would configure a real transaction manager.
The following example shows how to do so:
<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager" />
Reactive Transactions
Starting with version 5.3, a ReactiveTransactionManager
can also be used together with a TransactionInterceptor
advice for endpoints returning a reactive type.
This includes MessageSource
and ReactiveMessageHandler
implementations (e.g. ReactiveMongoDbMessageSource
) which produce a message with a Flux
or Mono
payload.
All other reply producing message handler implementations can rely on a ReactiveTransactionManager
when their reply payload is also some reactive type.