此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.0spring-doc.cn

交易

本节介绍 Spring for Apache Kafka 如何支持事务。spring-doc.cn

概述

0.11.0.0 客户端库添加了对事务的支持。 Spring for Apache Kafka 通过以下方式添加支持:spring-doc.cn

  • KafkaTransactionManager:与普通的 Spring 事务支持(、 等)一起使用@TransactionalTransactionTemplatespring-doc.cn

  • 事务KafkaMessageListenerContainerspring-doc.cn

  • 本地事务KafkaTemplatespring-doc.cn

  • 与其他事务管理器的事务同步spring-doc.cn

通过向 提供 来启用事务。 在这种情况下,工厂不是管理单个 shared ,而是维护事务 producer 的缓存。 当用户调用生产者时,它会返回到缓存以供重用,而不是实际关闭。 每个生产者的属性为 + ,其中 starts with 和 递增。 在早期版本的 Spring for Apache Kafka 中,对于由具有基于记录的侦听器的侦听器容器启动的事务,它的生成方式不同,以支持围栏僵尸,这不再是必需的,从 3.0 开始是唯一的选项。 对于使用多个实例运行的应用程序,每个实例 的 必须唯一。DefaultKafkaProducerFactorytransactionIdPrefixProducerclose()transactional.idtransactionIdPrefixnn0transactional.idEOSMode.V2transactionIdPrefixspring-doc.cn

使用 Spring Boot,只需要设置属性 - Spring Boot 将自动配置一个 bean 并将其连接到侦听器容器中。spring.kafka.producer.transaction-id-prefixKafkaTransactionManagerspring-doc.cn

从版本 2.5.8 开始,您现在可以在 producer 工厂上配置该属性。 当使用可能为 broker 的 . 使用 current 时,这可能会导致 a 没有 rebalance。 通过将 小于 设置为 小于 ,如果生产者已超过其最大年龄,工厂将刷新生产者。maxAgetransactional.id.expiration.mskafka-clientsProducerFencedExceptionmaxAgetransactional.id.expiration.ms

KafkaTransactionManager

它是 Spring Framework 的 . 在其构造函数中提供了对 producer 工厂的引用。 如果您提供自定义生产者工厂,则它必须支持事务。 看。KafkaTransactionManagerPlatformTransactionManagerProducerFactory.transactionCapable()spring-doc.cn

你可以将 与普通的 Spring 事务支持(、 、 和其他)一起使用。 如果事务处于活动状态,则在事务范围内执行的任何操作都使用事务的 . 管理器提交或回滚事务,具体取决于成功或失败。 您必须将 配置为使用与事务管理器相同的 。KafkaTransactionManager@TransactionalTransactionTemplateKafkaTemplateProducerKafkaTemplateProducerFactoryspring-doc.cn

事务同步

本节引用仅限生产者事务(不是由侦听器容器启动的事务);有关在容器启动事务时链接事务的信息,请参阅Using Consumer-Initiated Transactionspring-doc.cn

如果要将记录发送到 kafka 并执行一些数据库更新,则可以使用普通的 Spring 事务管理,例如。DataSourceTransactionManagerspring-doc.cn

@Transactional
public void process(List<Thing> things) {
    things.forEach(thing -> this.kafkaTemplate.send("topic", thing));
    updateDb(things);
}

Comments 的拦截器启动事务,并将事务与该事务管理器同步;每次发送都将参与该交易。 当方法退出时,数据库事务将提交,然后是 Kafka 事务。 如果您希望以相反的顺序执行提交(首先是 Kafka),请使用嵌套方法,其中 outer 方法配置为使用 ,而 internal 方法配置为使用 .@TransactionalKafkaTemplate@TransactionalDataSourceTransactionManagerKafkaTransactionManagerspring-doc.cn

有关在 Kafka 优先或 DB 优先配置中同步 JDBC 和 Kafka 事务的应用程序示例,请参阅与其他事务管理器的 Kafka 事务示例。spring-doc.cn

从版本 2.5.17、2.6.12、2.7.9 和 2.8.0 开始,如果同步事务上的提交失败(在主事务提交之后),将向调用方抛出异常。 以前,这会被静默忽略(记录在 debug 级别)。 如有必要,应用程序应采取补救措施来补偿已提交的主事务。

使用使用者发起的事务

从 2.7 版开始,现在已经弃用了;有关更多信息,请参见 JavaDocs 的 super class。 相反,在容器中使用 a 来启动 Kafka 事务,并使用 Comments 侦听器方法来启动另一个事务。ChainedKafkaTransactionManagerChainedTransactionManagerKafkaTransactionManager@Transactionalspring-doc.cn

有关链接 JDBC 和 Kafka 事务的示例应用程序,请参阅使用其他事务管理器的 Kafka 事务示例spring-doc.cn

非阻塞重试不能与容器事务结合使用。 当侦听器代码引发异常时,容器事务提交成功,并将记录发送到可重试主题。

KafkaTemplate本地事务

您可以使用 在本地事务中执行一系列操作。 以下示例显示了如何执行此操作:KafkaTemplatespring-doc.cn

boolean result = template.executeInTransaction(t -> {
    t.sendDefault("thing1", "thing2");
    t.sendDefault("cat", "hat");
    return true;
});

回调中的参数是模板本身 ()。 如果回调正常退出,则提交事务。 如果引发异常,则回滚事务。thisspring-doc.cn

如果存在正在处理的(或已同步的)事务,则不会使用它。 相反,使用新的 “嵌套” 事务。KafkaTransactionManager

TransactionIdPrefix

使用 (aka ) 是唯一支持的模式,即使对于消费者发起的事务,也不再需要使用相同的模式;事实上,它在每个实例上都必须是唯一的,这与生产者发起的事务相同。 此属性在每个应用程序实例上必须具有不同的值。EOSMode.V2BETAtransactional.idspring-doc.cn

TransactionIdSuffix Fixed

从 3.2 开始,引入了一个新接口来管理 suffix。 默认实现是当设置大于零可以在特定范围内重复使用时,否则将通过递增计数器来动态生成后缀。 当请求事务生产者并且所有事务都在使用中时,抛出一个 . 然后,用户可以使用 configured 重试该异常,并适当配置回退。TransactionIdSuffixStrategytransactional.idDefaultTransactionIdSuffixStrategymaxCachetransactional.idtransactional.idNoProducerAvailableExceptionRetryTemplatespring-doc.cn

public static class Config {

    @Bean
    public ProducerFactory<String, String> myProducerFactory() {
        Map<String, Object> configs = producerConfigs();
        configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
        ...
        DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
        ...
        TransactionIdSuffixStrategy ss = new DefaultTransactionIdSuffixStrategy(5);
        pf.setTransactionIdSuffixStrategy(ss);
        return pf;
    }

}

设置为 5 时,为 +'{0-4}'。maxCachetransactional.idmy.txid.spring-doc.cn

与 和 一起使用时 和 enabling ,必须设置为大于或等于 的值。 如果 a 无法获取后缀,它将抛出一个 . 在 中使用嵌套事务时,必须调整 maxCache 设置以处理增加的嵌套事务数。KafkaTransactionManagerConcurrentMessageListenerContainermaxCachemaxCacheconcurrencyMessageListenerContainertransactional.idNoProducerAvailableExceptionConcurrentMessageListenerContainer

KafkaTemplate事务性和非事务性发布

通常,当 a 是事务性的(配置了支持事务的 producer 工厂)时,需要事务。 事务可以由 、 方法、调用 或侦听器容器(如果配置了 )启动 。 任何在事务范围之外使用模板的尝试都会导致模板抛出 . 从版本 2.4.3 开始,您可以将模板的属性设置为 。 在这种情况下,模板将通过调用 's 方法允许操作在没有事务的情况下运行;Producer 将像往常一样被缓存或线程绑定,以便重用。 请参见使用 DefaultKafkaProducerFactoryKafkaTemplateTransactionTemplate@TransactionalexecuteInTransactionKafkaTransactionManagerIllegalStateExceptionallowNonTransactionaltrueProducerFactorycreateNonTransactionalProducer()spring-doc.cn

使用 Batch 侦听器的事务

当侦听器在使用事务时失败时,将调用 在回滚发生后执行一些操作。 将 default 与记录侦听器一起使用时,将执行查找,以便重新交付失败的记录。 但是,使用批处理侦听器时,将重新交付整个批处理,因为框架不知道批处理中的哪条记录失败。 有关更多信息,请参阅 After-rollback ProcessorAfterRollbackProcessorAfterRollbackProcessorspring-doc.cn

使用批处理侦听器时,版本 2.4.2 引入了一种替代机制来处理处理批处理时出现的故障:。 当设置为 true 的容器工厂配置了 时,一次使用一条记录调用侦听器。 这将启用批处理中的错误处理,同时仍可以停止处理整个批处理,具体取决于异常类型。 提供了一个默认值,可以使用标准(如 . 以下测试用例配置代码段说明了如何使用此功能:BatchToRecordAdapterbatchListenerBatchToRecordAdapterBatchToRecordAdapterConsumerRecordRecovererDeadLetterPublishingRecovererspring-doc.cn

public static class TestListener {

    final List<String> values = new ArrayList<>();

    @KafkaListener(id = "batchRecordAdapter", topics = "test")
    public void listen(String data) {
        values.add(data);
        if ("bar".equals(data)) {
            throw new RuntimeException("reject partial");
        }
    }

}

@Configuration
@EnableKafka
public static class Config {

    ConsumerRecord<?, ?> failed;

    @Bean
    public TestListener test() {
        return new TestListener();
    }

    @Bean
    public ConsumerFactory<?, ?> consumerFactory() {
        return mock(ConsumerFactory.class);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);
        factory.setBatchToRecordAdapter(new DefaultBatchToRecordAdapter<>((record, ex) ->  {
            this.failed = record;
        }));
        return factory;
    }

}