Transactions

This section describes how Spring for Apache Pulsar supports transactions.spring-doc.cn

Overview

Spring for Apache Pulsar transaction support is built upon the transaction support provided by Spring Framework. At a high-level, transactional resources are registered with a transaction manager which in turn handles the transactional state (commit, rollback, etc..) of the registered resources.spring-doc.cn

Spring for Apache Pulsar provides the following:spring-doc.cn

  • PulsarTransactionManager - used with normal Spring transaction support (@Transactional, TransactionTemplate, etc)spring-doc.cn

  • Transactional PulsarTemplatespring-doc.cn

  • Transactional @PulsarListenerspring-doc.cn

  • Transaction synchronization with other transaction managersspring-doc.cn

Transaction support has not been added to the Reactive components yet

Transaction support is disabled by default. To enable support when using Spring Boot, simply set the spring.pulsar.transaction.enabled property. Further configuration options are outlined in each component section below.spring-doc.cn

Transactional Publishing with PulsarTemplate

All send operations on a transactional PulsarTemplate look for an active transaction and enlist each send operation in the transaction (if one is found).spring-doc.cn

Non-transactional use

By default, a transactional PulsarTemplate can also be used for non-transactional operations. When an existing transaction is not found it will continue the send operation in a non-transactional fashion. However, if the template is configured to require transactions then any attempt to use the template outside the scope of a transaction results in an exception.spring-doc.cn

A transaction can be started by a TransactionTemplate, a @Transactional method, calling executeInTransaction, or by a transactional listener container.

Local Transactions

We use the term "local" transaction to denote a Pulsar native transaction that is not managed by or associated with Spring’s transaction management facility (i.e. PulsarTransactionManager). Conversely, a "synchronized" transaction is one that is managed by or associated with the PulsarTransactionManager.spring-doc.cn

You can use the PulsarTemplate to execute a series of operations within a local transaction. The following example shows how to do so:spring-doc.cn

var results = pulsarTemplate.executeInTransaction((template) -> {
    var rv = new HashMap<String, MessageId>();
    rv.put("msg1", template.send(topic, "msg1"));
    rv.put("msg2", template.send(topic, "msg2"));
    return rv;
});

The argument in the callback is the template instance that the executeInTransaction method was invoked on. All operations on the template are enlisted in the current transaction. If the callback exits normally, the transaction is committed. If an exception is thrown, the transaction is rolled back.spring-doc.cn

If there is a synchronized transaction in process, it is ignored and a new "nested" transaction is used.

Configuration

The following transaction settings are available directly on the PulsarTemplate (via the transactions field):spring-doc.cn

  • enabled - whether the template supports transactions (default false)spring-doc.cn

  • required - whether the template requires transactions (default false)spring-doc.cn

  • timeout - duration of the transaction timeout (default null)spring-doc.cn

When not using Spring Boot, you can adjust these settings on the template that you provide. However, when using Spring Boot, the template is auto-configured and there is no mechanism to affect the properties. In this case you can register a PulsarTemplateCustomizer bean that can be used to adjust the settings. The following example shows how to set the timeout on the auto-configured template:spring-doc.cn

@Bean
PulsarTemplateCustomizer<?> templateCustomizer() {
    return (template) -> template.transactions().setTimeout(Duration.ofSeconds(45));
}

Transactional Receiving with @PulsarListener

When listener transactions are enabled, the @PulsarListener annotated listener method is invoked in the scope of a synchronized transaction.spring-doc.cn

The DefaultPulsarMessageListenerContainer uses a Spring TransactionTemplate configured with a PulsarTransactionManager to initiate the transaction prior to method invocation.spring-doc.cn

The acknowledgment of each received message is enlisted in the scoped transaction.spring-doc.cn

Consume-Process-Produce Scenario

A common transactional pattern is where a consumer reads messages from a Pulsar topic, transforms the messages, and finally a producer writes the resulting messages to another Pulsar topic. The framework supports this use case when transactions are enabled and your listener method uses a transactional PulsarTemplate to produce the transformed message.spring-doc.cn

Given the following listener method:spring-doc.cn

@PulsarListener(topics = "my-input-topic") (1)
void listen(String msg) { (2)
    var transformedMsg = msg.toUpperCase(Locale.ROOT); (3)
    this.transactionalTemplate.send("my-output-topic", transformedMsg); (4)
} (5) (6)

The following interactions occur when listener transactions are enabled:spring-doc.cn

1 Listener container initiates new transaction and invokes listener method in scope of transaction
2 Listener method receives message
3 Listener method transforms message
4 Listener method sends transformed message with transactional template which enlists send operation in active transaction
5 Listener container auto-acks message and enlists ack operation in active transaction
6 Listener container (via TransactionTemplate) commits transaction

If you are not using @PulsarListener and instead using listener containers directly, the same transaction support is provided as described above. Remember, the @PulsarListener is just a convenience to register a Java method as the listener container message listener.spring-doc.cn

Transactions with Record Listeners

The above example uses a record listener. When using a record listener, a new transaction is created on every listener method invocation which equates to a transaction per message.spring-doc.cn

Because the transaction boundary is per message and each message acknowledgement is enlisted in each transaction, batch ack mode can not be used with transactional record listeners.

Transactions with Batch Listeners

When using a batch listener, a new transaction is created on every listener method invocation which equates to a transaction per batch of messages.spring-doc.cn

Transactional batch listeners do not currently support custom error handlers.

Configuration

Listener container factory

The following transaction settings are available directly on the PulsarContainerProperties used by the ConcurrentPulsarListenerContainerFactory when creating listener containers. These settings affect all listener containers, including the ones used by @PulsarListener.spring-doc.cn

  • enabled - whether the container supports transactions (default false)spring-doc.cn

  • required - whether the container requires transactions (default false)spring-doc.cn

  • timeout - duration of the transaction timeout (default null)spring-doc.cn

  • transactionDefinition - a blueprint transaction definition with properties that will be copied to the container’s transaction template (default null)spring-doc.cn

  • transactionManager - the transaction manager used to start transactionsspring-doc.cn

When not using Spring Boot, you can adjust these settings on the container factory that you provide. However, when using Spring Boot, the container factory is auto-configured. In this case you can register a org.springframework.boot.autoconfigure.pulsar.PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>> bean to access and customize the container properties. The following example shows how to set the timeout on the container factory:spring-doc.cn

@Bean
PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>> containerCustomizer() {
    return (containerFactory) -> containerFactory.getContainerProperties().transactions().setTimeout(Duration.ofSeconds(45));
}

@PulsarListener

By default, each listener respects the transactional settings of its corresponding listener container factory. However, the user can set the transactional attribute on each @PulsarListener to override the container factory setting as follows:spring-doc.cn

  • If the container factory has transactions enabled then transactional = false will disable transactions for the indiviual listener.spring-doc.cn

  • If the container factory has transactions enabled and required, then an attempt to set transactional = false will result in an exception being thrown stating that transactions are required.spring-doc.cn

  • If the container factory has transactions disabled then an attempt to set transactional = true will be ignored and a warning will be logged.spring-doc.cn

Using PulsarTransactionManager

The PulsarTransactionManager is an implementation of Spring Framework’s PlatformTransactionManager. You can use the PulsarTransactionManager with normal Spring transaction support (@Transactional, TransactionTemplate, and others).spring-doc.cn

If a transaction is active, any PulsarTemplate operations performed within the scope of the transaction enlist and participate in the ongoing transaction. The manager commits or rolls back the transaction, depending on success or failure.spring-doc.cn

You probably will not need to use PulsarTransactionManager directly since the majority of transactional use cases are covered by PulsarTemplate and @PulsarListener.

Pulsar Transactions with Other Transaction Managers

Producer-only transaction

If you want to send records to Pulsar and perform some database updates in a single transaction, you can use normal Spring transaction management with a DataSourceTransactionManager.spring-doc.cn

The following examples assume there is a DataSourceTransactionManager bean registered under the name "dataSourceTransactionManager"
@Transactional("dataSourceTransactionManager")
public void myServiceMethod() {
    var msg = calculateMessage();
    this.pulsarTemplate.send("my-topic", msg);
    this.jdbcTemplate.execute("insert into my_table (data) values ('%s')".formatted(msg));
}

The interceptor for the @Transactional annotation starts the database transaction and the PulsarTemplate will synchronize a transaction with the DB transaction manager; each send will participate in that transaction. When the method exits, the database transaction will commit followed by the Pulsar transaction.spring-doc.cn

If you wish to commit the Pulsar transaction first, and only commit the DB transaction if the Pulsar transaction is successful, use nested @Transactional methods, with the outer method configured to use the DataSourceTransactionManager, and the inner method configured to use the PulsarTransactionManager.spring-doc.cn

@Transactional("dataSourceTransactionManager")
public void myServiceMethod() {
    var msg = calculateMessage();
    this.jdbcTemplate.execute("insert into my_table (data) values ('%s')".formatted(msg));
    this.sendToPulsar(msg);
}

@Transactional("pulsarTransactionManager")
public void sendToPulsar(String msg) {
    this.pulsarTemplate.send("my-topic", msg);
}

Consumer + Producer transaction

If you want to consume records from Pulsar, send records to Pulsar, and perform some database updates in a transaction, you can combine normal Spring transaction management (using a DataSourceTransactionManager) with container initiated transactions.spring-doc.cn

In the following example, the listener container starts the Pulsar transaction and the @Transactional annotation starts the DB transaction. The DB transaction is committed first; if the Pulsar transaction fails to commit, the record will be redelivered so the DB update should be idempotent.spring-doc.cn

@PulsarListener(topics = "my-input-topic")
@Transactional("dataSourceTransactionManager")
void listen(String msg) {
    var transformedMsg = msg.toUpperCase(Locale.ROOT);
    this.pulsarTemplate.send("my-output-topic", transformedMsg);
    this.jdbcTemplate.execute("insert into my_table (data) values ('%s')".formatted(transformedMsg));
}