此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.2.1Spring中文文档

此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.2.1Spring中文文档

本节介绍 Spring for Apache Kafka 如何支持事务。Spring中文文档

概述

0.11.0.0 客户端库增加了对事务的支持。 Spring for Apache Kafka 通过以下方式添加支持:Spring中文文档

通过提供 . 在这种情况下,工厂不是管理单个共享,而是维护事务生产者的缓存。 当用户调用生产者时,它会返回到缓存中以供重用,而不是实际关闭。 每个生产者的属性是 + ,其中从开始,并针对每个新生产者递增。 在 Spring for Apache Kafka 的早期版本中,对于由具有基于记录的侦听器的侦听器的侦听器容器启动的事务,生成的事务的生成方式不同,以支持围栏僵尸,这不再是必需的,这是从 3.0 开始的唯一选项。 对于使用多个实例运行的应用程序,每个实例必须是唯一的。DefaultKafkaProducerFactorytransactionIdPrefixProducerclose()transactional.idtransactionIdPrefixnn0transactional.idEOSMode.V2transactionIdPrefixSpring中文文档

使用 Spring Boot,只需要设置属性 - Spring Boot 将自动配置一个 Bean 并将其连接到侦听器容器中。spring.kafka.producer.transaction-id-prefixKafkaTransactionManagerSpring中文文档

从版本 2.5.8 开始,您现在可以在生产者工厂上配置该属性。 当使用可能闲置的事务生产者时,这很有用。 对于电流,这可能会导致没有重新平衡。 通过将 设置为小于 ,如果生产者已超过其最大年龄,工厂将刷新生产者。maxAgetransactional.id.expiration.mskafka-clientsProducerFencedExceptionmaxAgetransactional.id.expiration.ms
从版本 2.5.8 开始,您现在可以在生产者工厂上配置该属性。 当使用可能闲置的事务生产者时,这很有用。 对于电流,这可能会导致没有重新平衡。 通过将 设置为小于 ,如果生产者已超过其最大年龄,工厂将刷新生产者。maxAgetransactional.id.expiration.mskafka-clientsProducerFencedExceptionmaxAgetransactional.id.expiration.ms

KafkaTransactionManager

是 Spring Framework 的 . 它在其构造函数中提供了对生产者工厂的引用。 如果提供自定义生产者工厂,则它必须支持事务。 看。KafkaTransactionManagerPlatformTransactionManagerProducerFactory.transactionCapable()Spring中文文档

您可以将 与普通的 Spring 事务支持(、 等)一起使用。 如果事务处于活动状态,则在事务范围内执行的任何操作都使用事务的 . 经理提交或回滚事务,具体取决于成功或失败。 您必须配置为使用与事务管理器相同的内容。KafkaTransactionManager@TransactionalTransactionTemplateKafkaTemplateProducerKafkaTemplateProducerFactorySpring中文文档

事务同步

本节涉及仅限生产者的事务(未由侦听器容器启动的事务);有关在容器启动事务时链接事务的信息,请参阅使用使用者发起的事务Spring中文文档

如果要将记录发送到 kafka 并执行一些数据库更新,则可以使用常规的 Spring 事务管理,例如 .DataSourceTransactionManagerSpring中文文档

@Transactional
public void process(List<Thing> things) {
    things.forEach(thing -> this.kafkaTemplate.send("topic", thing));
    updateDb(things);
}

注解的拦截器启动事务,并将事务与该事务管理器同步;每次发送都将参与该交易。 当方法退出时,数据库事务将提交,后跟 Kafka 事务。 如果希望以相反的顺序(首先是 Kafka)执行提交,请使用嵌套方法,将外部方法配置为使用 ,将内部方法配置为使用 。@TransactionalKafkaTemplate@TransactionalDataSourceTransactionManagerKafkaTransactionManagerSpring中文文档

有关在 Kafka 优先或 DB 优先配置中同步 JDBC 和 Kafka 事务的应用程序的示例,请参阅 Kafka 事务与其他事务管理器的示例Spring中文文档

从版本 2.5.17、2.6.12、2.7.9 和 2.8.0 开始,如果同步事务上的提交失败(在主事务提交后),则异常将抛给调用方。 以前,这被静默地忽略(记录在调试级别)。 如有必要,应用程序应采取补救措施,以补偿已提交的主要事务。
从版本 2.5.17、2.6.12、2.7.9 和 2.8.0 开始,如果同步事务上的提交失败(在主事务提交后),则异常将抛给调用方。 以前,这被静默地忽略(记录在调试级别)。 如有必要,应用程序应采取补救措施,以补偿已提交的主要事务。

使用消费者发起的事务

自 2.7 版起,现已弃用;有关更多信息,请参见 JavaDocs 的超级类。 相反,请在容器中使用 a 来启动 Kafka 事务,并使用 listener 方法进行批注以启动其他事务。ChainedKafkaTransactionManagerChainedTransactionManagerKafkaTransactionManager@TransactionalSpring中文文档

有关链接 JDBC 和 Kafka 事务的示例应用程序,请参阅 Kafka 与其他事务管理器的事务示例Spring中文文档

非阻塞重试不能与容器事务结合使用。 当侦听器代码引发异常时,容器事务提交成功,并将记录发送到可重试主题。
非阻塞重试不能与容器事务结合使用。 当侦听器代码引发异常时,容器事务提交成功,并将记录发送到可重试主题。

KafkaTemplate本地交易

您可以使用 在本地事务中执行一系列操作。 以下示例演示如何执行此操作:KafkaTemplateSpring中文文档

boolean result = template.executeInTransaction(t -> {
    t.sendDefault("thing1", "thing2");
    t.sendDefault("cat", "hat");
    return true;
});

回调中的参数是模板本身 ()。 如果回调正常退出,则交易已提交。 如果引发异常,则事务将回滚。thisSpring中文文档

如果正在处理(或已同步的)事务,则不会使用该事务。 相反,使用新的“嵌套”事务。KafkaTransactionManager
如果正在处理(或已同步的)事务,则不会使用该事务。 相反,使用新的“嵌套”事务。KafkaTransactionManager

TransactionIdPrefix

使用 (aka ),这是唯一支持的模式,即使对于消费者发起的交易,也不再需要使用相同的模式;事实上,它必须在每个实例上都是唯一的,就像生产者发起的事务一样。 此属性在每个应用程序实例上必须具有不同的值。EOSMode.V2BETAtransactional.idSpring中文文档

TransactionIdSuffix Fixed

从 3.2 开始,引入了一个新界面来管理后缀。 默认实现是当设置大于零时可以在特定范围内重复使用,否则将通过递增计数器动态生成后缀。 当请求事务生产者并且全部使用时,抛出 . 然后,用户可以使用配置的 a 重试该异常,并适当配置回退。TransactionIdSuffixStrategytransactional.idDefaultTransactionIdSuffixStrategymaxCachetransactional.idtransactional.idNoProducerAvailableExceptionRetryTemplateSpring中文文档

public static class Config {

    @Bean
    public ProducerFactory<String, String> myProducerFactory() {
        Map<String, Object> configs = producerConfigs();
        configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
        ...
        DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
        ...
        TransactionIdSuffixStrategy ss = new DefaultTransactionIdSuffixStrategy(5);
        pf.setTransactionIdSuffixStrategy(ss);
        return pf;
    }

}

设置为 5 时,为 +'{0-4}'。maxCachetransactional.idmy.txid.Spring中文文档

与 和 使能一起使用时,必须设置为大于或等于 的值。 如果 a 无法获取后缀,它将抛出 . 在 中使用嵌套事务时,需要调整 maxCache 设置以处理增加的嵌套事务数。KafkaTransactionManagerConcurrentMessageListenerContainermaxCachemaxCacheconcurrencyMessageListenerContainertransactional.idNoProducerAvailableExceptionConcurrentMessageListenerContainer
与 和 使能一起使用时,必须设置为大于或等于 的值。 如果 a 无法获取后缀,它将抛出 . 在 中使用嵌套事务时,需要调整 maxCache 设置以处理增加的嵌套事务数。KafkaTransactionManagerConcurrentMessageListenerContainermaxCachemaxCacheconcurrencyMessageListenerContainertransactional.idNoProducerAvailableExceptionConcurrentMessageListenerContainer

KafkaTemplate事务性和非事务性发布

通常,当 a 是事务性的(配置了具有事务功能的生产者工厂)时,事务是必需的。 当配置了 . 任何在事务范围之外使用模板的尝试都会导致模板抛出 . 从版本 2.4.3 开始,可以将模板的属性设置为 。 在这种情况下,模板将通过调用 的方法允许操作在没有事务的情况下运行;生产者将像往常一样被缓存或线程绑定,以便重用。 请参阅使用 DefaultKafkaProducerFactoryKafkaTemplateTransactionTemplate@TransactionalexecuteInTransactionKafkaTransactionManagerIllegalStateExceptionallowNonTransactionaltrueProducerFactorycreateNonTransactionalProducer()Spring中文文档

与批处理侦听器的事务

当侦听器在使用事务时失败时,将调用 在回滚发生后执行某些操作。 将默认值与记录侦听器一起使用时,将执行查找,以便重新传递失败的记录。 但是,使用批处理侦听器时,整个批处理将被重新传递,因为框架不知道批处理中的哪条记录失败。 有关详细信息,请参阅回滚后处理器AfterRollbackProcessorAfterRollbackProcessorSpring中文文档

使用批处理侦听器时,版本 2.4.2 引入了一种替代机制来处理批处理时的故障:. 当设置为 true 的容器工厂配置为 时,一次使用一条记录调用侦听器。 这样就可以在批处理中处理错误,同时仍然可以停止处理整个批处理,具体取决于异常类型。 提供了默认值,可以使用标准(如 . 以下测试用例配置代码片段说明了如何使用此功能:BatchToRecordAdapterbatchListenerBatchToRecordAdapterBatchToRecordAdapterConsumerRecordRecovererDeadLetterPublishingRecovererSpring中文文档

public static class TestListener {

    final List<String> values = new ArrayList<>();

    @KafkaListener(id = "batchRecordAdapter", topics = "test")
    public void listen(String data) {
        values.add(data);
        if ("bar".equals(data)) {
            throw new RuntimeException("reject partial");
        }
    }

}

@Configuration
@EnableKafka
public static class Config {

    ConsumerRecord<?, ?> failed;

    @Bean
    public TestListener test() {
        return new TestListener();
    }

    @Bean
    public ConsumerFactory<?, ?> consumerFactory() {
        return mock(ConsumerFactory.class);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);
        factory.setBatchToRecordAdapter(new DefaultBatchToRecordAdapter<>((record, ex) ->  {
            this.failed = record;
        }));
        return factory;
    }

}