This section describes how Spring for Apache Kafka supports transactions.
Overview
The 0.11.0.0 client library added support for transactions. Spring for Apache Kafka adds support in the following ways:
-
KafkaTransactionManager
: Used with normal Spring transaction support (@Transactional
,TransactionTemplate
, etc) -
Transactional
KafkaMessageListenerContainer
-
Local transactions with
KafkaTemplate
-
Transaction synchronization with other transaction managers
Transactions are enabled by providing the DefaultKafkaProducerFactory
with a transactionIdPrefix
.
In that case, instead of managing a single shared Producer
, the factory maintains a cache of transactional producers.
When the user calls close()
on a producer, it is returned to the cache for reuse instead of actually being closed.
The transactional.id
property of each producer is transactionIdPrefix
+ n
, where n
starts with 0
and is incremented for each new producer.
In previous versions of Spring for Apache Kafka, the transactional.id
was generated differently for transactions started by a listener container with a record-based listener, to support fencing zombies, which is not necessary any more, with EOSMode.V2
being the only option starting with 3.0.
For applications running with multiple instances, the transactionIdPrefix
must be unique per instance.
Also see Exactly Once Semantics.
Also see transactionIdPrefix
.
With Spring Boot, it is only necessary to set the spring.kafka.producer.transaction-id-prefix
property - Spring Boot will automatically configure a KafkaTransactionManager
bean and wire it into the listener container.
Starting with version 2.5.8, you can now configure the maxAge property on the producer factory.
This is useful when using transactional producers that might lay idle for the broker’s transactional.id.expiration.ms .
With current kafka-clients , this can cause a ProducerFencedException without a rebalance.
By setting the maxAge to less than transactional.id.expiration.ms , the factory will refresh the producer if it is past its max age.
|
Starting with version 2.5.8, you can now configure the maxAge property on the producer factory.
This is useful when using transactional producers that might lay idle for the broker’s transactional.id.expiration.ms .
With current kafka-clients , this can cause a ProducerFencedException without a rebalance.
By setting the maxAge to less than transactional.id.expiration.ms , the factory will refresh the producer if it is past its max age.
|
Using KafkaTransactionManager
The KafkaTransactionManager
is an implementation of Spring Framework’s PlatformTransactionManager
.
It is provided with a reference to the producer factory in its constructor.
If you provide a custom producer factory, it must support transactions.
See ProducerFactory.transactionCapable()
.
You can use the KafkaTransactionManager
with normal Spring transaction support (@Transactional
, TransactionTemplate
, and others).
If a transaction is active, any KafkaTemplate
operations performed within the scope of the transaction use the transaction’s Producer
.
The manager commits or rolls back the transaction, depending on success or failure.
You must configure the KafkaTemplate
to use the same ProducerFactory
as the transaction manager.
Transaction Synchronization
This section refers to producer-only transactions (transactions not started by a listener container); see Using Consumer-Initiated Transactions for information about chaining transactions when the container starts the transaction.
If you want to send records to kafka and perform some database updates, you can use normal Spring transaction management with, say, a DataSourceTransactionManager
.
@Transactional
public void process(List<Thing> things) {
things.forEach(thing -> this.kafkaTemplate.send("topic", thing));
updateDb(things);
}
The interceptor for the @Transactional
annotation starts the transaction and the KafkaTemplate
will synchronize a transaction with that transaction manager; each send will participate in that transaction.
When the method exits, the database transaction will commit followed by the Kafka transaction.
If you wish the commits to be performed in the reverse order (Kafka first), use nested @Transactional
methods, with the outer method configured to use the DataSourceTransactionManager
, and the inner method configured to use the KafkaTransactionManager
.
See Examples of Kafka Transactions with Other Transaction Managers for examples of an application that synchronizes JDBC and Kafka transactions in Kafka-first or DB-first configurations.
Starting with versions 2.5.17, 2.6.12, 2.7.9 and 2.8.0, if the commit fails on the synchronized transaction (after the primary transaction has committed), the exception will be thrown to the caller. Previously, this was silently ignored (logged at debug level). Applications should take remedial action, if necessary, to compensate for the committed primary transaction. |
Starting with versions 2.5.17, 2.6.12, 2.7.9 and 2.8.0, if the commit fails on the synchronized transaction (after the primary transaction has committed), the exception will be thrown to the caller. Previously, this was silently ignored (logged at debug level). Applications should take remedial action, if necessary, to compensate for the committed primary transaction. |
Using Consumer-Initiated Transactions
The ChainedKafkaTransactionManager
is now deprecated, since version 2.7; see the JavaDocs for its super class ChainedTransactionManager
for more information.
Instead, use a KafkaTransactionManager
in the container to start the Kafka transaction and annotate the listener method with @Transactional
to start the other transaction.
See Examples of Kafka Transactions with Other Transaction Managers for an example application that chains JDBC and Kafka transactions.
Non-Blocking Retries cannot combine with Container Transactions. When the listener code throws an exception, container transaction commit succeeds, and the record is sent to the retryable topic. |
Non-Blocking Retries cannot combine with Container Transactions. When the listener code throws an exception, container transaction commit succeeds, and the record is sent to the retryable topic. |
KafkaTemplate
Local Transactions
You can use the KafkaTemplate
to execute a series of operations within a local transaction.
The following example shows how to do so:
boolean result = template.executeInTransaction(t -> {
t.sendDefault("thing1", "thing2");
t.sendDefault("cat", "hat");
return true;
});
The argument in the callback is the template itself (this
).
If the callback exits normally, the transaction is committed.
If an exception is thrown, the transaction is rolled back.
If there is a KafkaTransactionManager (or synchronized) transaction in process, it is not used.
Instead, a new "nested" transaction is used.
|
If there is a KafkaTransactionManager (or synchronized) transaction in process, it is not used.
Instead, a new "nested" transaction is used.
|
TransactionIdPrefix
With EOSMode.V2
(aka BETA
), the only supported mode, it is no longer necessary to use the same transactional.id
, even for consumer-initiated transactions; in fact, it must be unique on each instance the same as for producer-initiated transactions.
This property must have a different value on each application instance.
TransactionIdSuffix Fixed
Since 3.2, a new TransactionIdSuffixStrategy
interface was introduced to manage transactional.id
suffix.
The default implementation is DefaultTransactionIdSuffixStrategy
when setting maxCache
greater than zero can reuse transactional.id
within a specific range, otherwise suffixes will be generated on the fly by incrementing a counter.
When a transaction producer is requested and transactional.id
all in use, throw a NoProducerAvailableException
.
User can then use a RetryTemplate
configured to retry that exception, with a suitably configured back off.
public static class Config {
@Bean
public ProducerFactory<String, String> myProducerFactory() {
Map<String, Object> configs = producerConfigs();
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
...
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
...
TransactionIdSuffixStrategy ss = new DefaultTransactionIdSuffixStrategy(5);
pf.setTransactionIdSuffixStrategy(ss);
return pf;
}
}
When setting maxCache
to 5, transactional.id
is my.txid.
+`{0-4}`.
When using KafkaTransactionManager with the ConcurrentMessageListenerContainer and enabling maxCache , it is necessary to set maxCache to a value greater than or equal to concurrency .
If a MessageListenerContainer is unable to acquire a transactional.id suffix, it will throw a NoProducerAvailableException .
When using nested transactions in the ConcurrentMessageListenerContainer , it is necessary to adjust the maxCache setting to handle the increased number of nested transactions.
|
When using KafkaTransactionManager with the ConcurrentMessageListenerContainer and enabling maxCache , it is necessary to set maxCache to a value greater than or equal to concurrency .
If a MessageListenerContainer is unable to acquire a transactional.id suffix, it will throw a NoProducerAvailableException .
When using nested transactions in the ConcurrentMessageListenerContainer , it is necessary to adjust the maxCache setting to handle the increased number of nested transactions.
|
KafkaTemplate
Transactional and non-Transactional Publishing
Normally, when a KafkaTemplate
is transactional (configured with a transaction-capable producer factory), transactions are required.
The transaction can be started by a TransactionTemplate
, a @Transactional
method, calling executeInTransaction
, or by a listener container, when configured with a KafkaTransactionManager
.
Any attempt to use the template outside the scope of a transaction results in the template throwing an IllegalStateException
.
Starting with version 2.4.3, you can set the template’s allowNonTransactional
property to true
.
In that case, the template will allow the operation to run without a transaction, by calling the ProducerFactory
's createNonTransactionalProducer()
method; the producer will be cached, or thread-bound, as normal for reuse.
See Using DefaultKafkaProducerFactory
.
Transactions with Batch Listeners
When a listener fails while transactions are being used, the AfterRollbackProcessor
is invoked to take some action after the rollback occurs.
When using the default AfterRollbackProcessor
with a record listener, seeks are performed so that the failed record will be redelivered.
With a batch listener, however, the whole batch will be redelivered because the framework doesn’t know which record in the batch failed.
See After-rollback Processor for more information.
When using a batch listener, version 2.4.2 introduced an alternative mechanism to deal with failures while processing a batch: BatchToRecordAdapter
.
When a container factory with batchListener
set to true is configured with a BatchToRecordAdapter
, the listener is invoked with one record at a time.
This enables error handling within the batch, while still making it possible to stop processing the entire batch, depending on the exception type.
A default BatchToRecordAdapter
is provided, that can be configured with a standard ConsumerRecordRecoverer
such as the DeadLetterPublishingRecoverer
.
The following test case configuration snippet illustrates how to use this feature:
public static class TestListener {
final List<String> values = new ArrayList<>();
@KafkaListener(id = "batchRecordAdapter", topics = "test")
public void listen(String data) {
values.add(data);
if ("bar".equals(data)) {
throw new RuntimeException("reject partial");
}
}
}
@Configuration
@EnableKafka
public static class Config {
ConsumerRecord<?, ?> failed;
@Bean
public TestListener test() {
return new TestListener();
}
@Bean
public ConsumerFactory<?, ?> consumerFactory() {
return mock(ConsumerFactory.class);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setBatchToRecordAdapter(new DefaultBatchToRecordAdapter<>((record, ex) -> {
this.failed = record;
}));
return factory;
}
}