This section describes how Spring for Apache Pulsar supports transactions.
Overview
Spring for Apache Pulsar transaction support is built upon the transaction support provided by Spring Framework. At a high-level, transactional resources are registered with a transaction manager which in turn handles the transactional state (commit, rollback, etc..) of the registered resources.
Spring for Apache Pulsar provides the following:
-
PulsarTransactionManager
- used with normal Spring transaction support (@Transactional
,TransactionTemplate
, etc) -
Transactional
PulsarTemplate
-
Transactional
@PulsarListener
-
Transaction synchronization with other transaction managers
Transaction support has not been added to the Reactive components yet |
Transaction support is disabled by default.
To enable support when using Spring Boot, simply set the spring.pulsar.transaction.enabled
property.
Further configuration options are outlined in each component section below.
Transaction support has not been added to the Reactive components yet |
Transactional Publishing with PulsarTemplate
All send operations on a transactional PulsarTemplate
look for an active transaction and enlist each send operation in the transaction (if one is found).
Non-transactional use
By default, a transactional PulsarTemplate
can also be used for non-transactional operations.
When an existing transaction is not found it will continue the send operation in a non-transactional fashion.
However, if the template is configured to require transactions then any attempt to use the template outside the scope of a transaction results in an exception.
A transaction can be started by a TransactionTemplate , a @Transactional method, calling executeInTransaction , or by a transactional listener container.
|
Local Transactions
We use the term "local" transaction to denote a Pulsar native transaction that is not managed by or associated with Spring’s transaction management facility (i.e. PulsarTransactionManager
).
Conversely, a "synchronized" transaction is one that is managed by or associated with the PulsarTransactionManager
.
You can use the PulsarTemplate
to execute a series of operations within a local transaction.
The following example shows how to do so:
var results = pulsarTemplate.executeInTransaction((template) -> {
var rv = new HashMap<String, MessageId>();
rv.put("msg1", template.send(topic, "msg1"));
rv.put("msg2", template.send(topic, "msg2"));
return rv;
});
The argument in the callback is the template instance that the executeInTransaction
method was invoked on.
All operations on the template are enlisted in the current transaction.
If the callback exits normally, the transaction is committed.
If an exception is thrown, the transaction is rolled back.
If there is a synchronized transaction in process, it is ignored and a new "nested" transaction is used. |
Configuration
The following transaction settings are available directly on the PulsarTemplate
(via the transactions
field):
-
enabled
- whether the template supports transactions (defaultfalse
) -
required
- whether the template requires transactions (defaultfalse
) -
timeout
- duration of the transaction timeout (defaultnull
)
When not using Spring Boot, you can adjust these settings on the template that you provide.
However, when using Spring Boot, the template is auto-configured and there is no mechanism to affect the properties.
In this case you can register a PulsarTemplateCustomizer
bean that can be used to adjust the settings.
The following example shows how to set the timeout on the auto-configured template:
@Bean
PulsarTemplateCustomizer<?> templateCustomizer() {
return (template) -> template.transactions().setTimeout(Duration.ofSeconds(45));
}
A transaction can be started by a TransactionTemplate , a @Transactional method, calling executeInTransaction , or by a transactional listener container.
|
If there is a synchronized transaction in process, it is ignored and a new "nested" transaction is used. |
Transactional Receiving with @PulsarListener
When listener transactions are enabled, the @PulsarListener
annotated listener method is invoked in the scope of a synchronized transaction.
The DefaultPulsarMessageListenerContainer
uses a Spring TransactionTemplate
configured with a PulsarTransactionManager
to initiate the transaction prior to method invocation.
The acknowledgment of each received message is enlisted in the scoped transaction.
Consume-Process-Produce Scenario
A common transactional pattern is where a consumer reads messages from a Pulsar topic, transforms the messages, and finally a producer writes the resulting messages to another Pulsar topic.
The framework supports this use case when transactions are enabled and your listener method uses a transactional PulsarTemplate
to produce the transformed message.
Given the following listener method:
@PulsarListener(topics = "my-input-topic") (1)
void listen(String msg) { (2)
var transformedMsg = msg.toUpperCase(); (3)
this.transactionalTemplate.send("my-output-topic", transformedMsg); (4)
} (5) (6)
The following interactions occur when listener transactions are enabled:
1 | Listener container initiates new transaction and invokes listener method in scope of transaction |
2 | Listener method receives message |
3 | Listener method transforms message |
4 | Listener method sends transformed message with transactional template which enlists send operation in active transaction |
5 | Listener container auto-acks message and enlists ack operation in active transaction |
6 | Listener container (via TransactionTemplate ) commits transaction |
If you are not using @PulsarListener
and instead using listener containers directly, the same transaction support is provided as described above.
Remember, the @PulsarListener
is just a convenience to register a Java method as the listener container message listener.
Transactions with Record Listeners
The above example uses a record listener. When using a record listener, a new transaction is created on every listener method invocation which equates to a transaction per message.
Because the transaction boundary is per message and each message acknowledgement is enlisted in each transaction, batch ack mode can not be used with transactional record listeners. |
Transactions with Batch Listeners
When using a batch listener, a new transaction is created on every listener method invocation which equates to a transaction per batch of messages.
Transactional batch listeners do not currently support custom error handlers. |
Configuration
Listener container factory
The following transaction settings are available directly on the PulsarContainerProperties
used by the ConcurrentPulsarListenerContainerFactory
when creating listener containers.
These settings affect all listener containers, including the ones used by @PulsarListener
.
-
enabled
- whether the container supports transactions (defaultfalse
) -
required
- whether the container requires transactions (defaultfalse
) -
timeout
- duration of the transaction timeout (defaultnull
) -
transactionDefinition
- a blueprint transaction definition with properties that will be copied to the container’s transaction template (defaultnull
) -
transactionManager
- the transaction manager used to start transactions
When not using Spring Boot, you can adjust these settings on the container factory that you provide.
However, when using Spring Boot, the container factory is auto-configured.
In this case you can register a ConcurrentPulsarListenerContainerFactoryCustomizer
bean to access and customize the container properties.
The following example shows how to set the timeout on the container factory:
@Bean
ConcurrentPulsarListenerContainerFactoryCustomizer<?> containerCustomizer() {
return (containerFactory) -> containerFactory.getContainerProperties().transactions().setTimeout(Duration.ofSeconds(45));
}
@PulsarListener
By default, each listener respects the transactional settings of its corresponding listener container factory.
However, the user can set the transactional
attribute on each @PulsarListener
to override the container factory setting as follows:
-
If the container factory has transactions enabled then
transactional = false
will disable transactions for the indiviual listener. -
If the container factory has transactions enabled and required, then an attempt to set
transactional = false
will result in an exception being thrown stating that transactions are required. -
If the container factory has transactions disabled then an attempt to set
transactional = true
will be ignored and a warning will be logged.
1 | Listener container initiates new transaction and invokes listener method in scope of transaction |
2 | Listener method receives message |
3 | Listener method transforms message |
4 | Listener method sends transformed message with transactional template which enlists send operation in active transaction |
5 | Listener container auto-acks message and enlists ack operation in active transaction |
6 | Listener container (via TransactionTemplate ) commits transaction |
Because the transaction boundary is per message and each message acknowledgement is enlisted in each transaction, batch ack mode can not be used with transactional record listeners. |
Transactional batch listeners do not currently support custom error handlers. |
Using PulsarTransactionManager
The PulsarTransactionManager
is an implementation of Spring Framework’s PlatformTransactionManager
.
You can use the PulsarTransactionManager
with normal Spring transaction support (@Transactional
, TransactionTemplate
, and others).
If a transaction is active, any PulsarTemplate
operations performed within the scope of the transaction enlist and participate in the ongoing transaction.
The manager commits or rolls back the transaction, depending on success or failure.
You probably will not need to use PulsarTransactionManager directly since the majority of transactional use cases are covered by PulsarTemplate and @PulsarListener .
|
You probably will not need to use PulsarTransactionManager directly since the majority of transactional use cases are covered by PulsarTemplate and @PulsarListener .
|
Pulsar Transactions with Other Transaction Managers
Producer-only transaction
If you want to send records to Pulsar and perform some database updates in a single transaction, you can use normal Spring transaction management with a DataSourceTransactionManager
.
The following examples assume there is a DataSourceTransactionManager bean registered under the name "dataSourceTransactionManager"
|
@Transactional("dataSourceTransactionManager")
public void myServiceMethod() {
var msg = calculateMessage();
this.pulsarTemplate.send("my-topic", msg);
this.jdbcTemplate.execute("insert into my_table (data) values ('%s')".formatted(msg));
}
The interceptor for the @Transactional
annotation starts the database transaction and the PulsarTemplate
will synchronize a transaction with the DB transaction manager; each send will participate in that transaction.
When the method exits, the database transaction will commit followed by the Pulsar transaction.
If you wish to commit the Pulsar transaction first, and only commit the DB transaction if the Pulsar transaction is successful, use nested @Transactional
methods, with the outer method configured to use the DataSourceTransactionManager
, and the inner method configured to use the PulsarTransactionManager
.
@Transactional("dataSourceTransactionManager")
public void myServiceMethod() {
var msg = calculateMessage();
this.jdbcTemplate.execute("insert into my_table (data) values ('%s')".formatted(msg));
this.sendToPulsar(msg);
}
@Transactional("pulsarTransactionManager")
public void sendToPulsar(String msg) {
this.pulsarTemplate.send("my-topic", msg);
}
Consumer + Producer transaction
If you want to consume records from Pulsar, send records to Pulsar, and perform some database updates in a transaction, you can combine normal Spring transaction management (using a DataSourceTransactionManager
) with container initiated transactions.
In the following example, the listener container starts the Pulsar transaction and the @Transactional
annotation starts the DB transaction.
The DB transaction is committed first; if the Pulsar transaction fails to commit, the record will be redelivered so the DB update should be idempotent.
@PulsarListener(topics = "my-input-topic")
@Transactional("dataSourceTransactionManager")
void listen(String msg) {
var transformedMsg = msg.toUpperCase();
this.pulsarTemplate.send("my-output-topic", transformedMsg);
this.jdbcTemplate.execute("insert into my_table (data) values ('%s')".formatted(transformedMsg));
}
The following examples assume there is a DataSourceTransactionManager bean registered under the name "dataSourceTransactionManager"
|