交易
本节介绍 Spring for Apache Pulsar 如何支持事务。
概述
Spring for Apache Pulsar 事务支持建立在 Spring Framework 提供的事务支持之上。 在高级别上,事务资源注册到事务管理器,事务管理器反过来处理已注册资源的事务状态(提交、回滚等)。
Spring for Apache Pulsar 提供以下内容:
-
PulsarTransactionManager
- 与普通的 Spring 事务支持(、 等)一起使用@Transactional
TransactionTemplate
-
事务
PulsarTemplate
-
事务
@PulsarListener
-
与其他事务管理器的事务同步
事务支持尚未添加到 Reactive 组件中 |
默认情况下,事务支持处于禁用状态。
要在使用 Spring Boot 时启用支持,只需设置该属性。
下面的每个组件部分概述了更多配置选项。spring.pulsar.transaction.enabled
事务性发布PulsarTemplate
事务性上的所有发送操作都会查找活动事务,并在事务中登记每个发送操作(如果找到)。PulsarTemplate
非事务性使用
默认情况下,事务也可用于非事务操作。
当找不到现有事务时,它将以非事务性方式继续发送操作。
但是,如果模板配置为需要事务,则任何在事务范围之外使用模板的尝试都会导致异常。PulsarTemplate
事务可以由 、 方法、 调用 或事务侦听器容器启动。TransactionTemplate @Transactional executeInTransaction |
本地事务
我们使用术语 “local” 事务来表示不受 Spring 的事务管理工具管理或与之关联的 Pulsar 原生事务(即 )。
相反,“同步”事务是由 管理或与 关联的事务。PulsarTransactionManager
PulsarTransactionManager
您可以使用 在本地事务中执行一系列操作。
以下示例显示了如何执行此操作:PulsarTemplate
var results = pulsarTemplate.executeInTransaction((template) -> {
var rv = new HashMap<String, MessageId>();
rv.put("msg1", template.send(topic, "msg1"));
rv.put("msg2", template.send(topic, "msg2"));
return rv;
});
回调中的参数是调用该方法的模板实例。
模板上的所有操作都登记在当前事务中。
如果回调正常退出,则提交事务。
如果引发异常,则回滚事务。executeInTransaction
如果正在处理同步事务,则忽略该事务并使用新的“嵌套”事务。 |
配置
以下交易设置可直接在 (通过字段):PulsarTemplate
transactions
-
enabled
- 模板是否支持事务(默认false
) -
required
- 模板是否需要事务(默认false
) -
timeout
- 事务超时的持续时间(默认null
)
不使用 Spring Boot 时,您可以在提供的模板上调整这些设置。
但是,在使用 Spring Boot 时,模板是自动配置的,并且没有影响属性的机制。
在这种情况下,您可以注册一个可用于调整设置的 Bean。
以下示例显示如何在自动配置的模板上设置超时:PulsarTemplateCustomizer
@Bean
PulsarTemplateCustomizer<?> templateCustomizer() {
return (template) -> template.transactions().setTimeout(Duration.ofSeconds(45));
}
事务接收@PulsarListener
启用侦听器事务后,将在同步事务的范围内调用带注释的侦听器方法。@PulsarListener
它使用配置了 a 的 Spring 在方法调用之前启动事务。DefaultPulsarMessageListenerContainer
TransactionTemplate
PulsarTransactionManager
每个收到的消息的确认都登记在范围事务中。
Consume-Process-Produce 场景
一种常见的事务模式是 Consumer 从 Pulsar 主题中读取消息,转换消息,最后 producer 将生成的消息写入另一个 Pulsar 主题。
当启用事务并且您的侦听器方法使用事务来生成转换后的消息时,框架支持此使用案例。PulsarTemplate
给定以下侦听器方法:
@PulsarListener(topics = "my-input-topic") (1)
void listen(String msg) { (2)
var transformedMsg = msg.toUpperCase(Locale.ROOT); (3)
this.transactionalTemplate.send("my-output-topic", transformedMsg); (4)
} (5) (6)
启用侦听器事务时,将发生以下交互:
1 | 侦听器容器启动新事务并调用事务范围内的侦听器方法 |
2 | 侦听器方法接收消息 |
3 | Listener 方法转换消息 |
4 | 侦听器方法使用事务模板发送转换后的消息,该模板在活动事务中登记发送操作 |
5 | 侦听器容器自动确认消息并在活动事务中登记确认操作 |
6 | 侦听器容器 (通过 ) 提交事务TransactionTemplate |
如果您不直接使用侦听器容器,而是使用侦听器容器,则提供如上所述的相同事务支持。
请记住,这只是为了方便将 Java 方法注册为侦听器容器消息侦听器。@PulsarListener
@PulsarListener
使用 Record 侦听器的事务
上面的示例使用记录侦听器。 使用记录侦听器时,将在每次侦听器方法调用时创建一个新事务,这相当于每条消息的一个事务。
由于事务边界是每条消息的,并且每个事务中都登记了每条消息确认,因此批量确认模式不能与事务记录侦听器一起使用。 |
配置
侦听器容器工厂
以下事务设置在创建侦听器容器时直接位于 上使用。
这些设置会影响所有侦听器容器,包括 使用的侦听器容器。PulsarContainerProperties
ConcurrentPulsarListenerContainerFactory
@PulsarListener
-
enabled
- 容器是否支持事务(默认false
) -
required
- 容器是否需要事务(默认false
) -
timeout
- 事务超时的持续时间(默认null
) -
transactionDefinition
- 一个蓝图事务定义,其属性将被复制到容器的事务模板(默认null
) -
transactionManager
- 用于启动事务的事务管理器
不使用 Spring Boot 时,您可以在提供的容器工厂上调整这些设置。
但是,在使用 Spring Boot 时,容器工厂是自动配置的。
在这种情况下,您可以注册一个 Bean 来访问和自定义容器属性。
以下示例显示了如何在容器工厂上设置超时:org.springframework.boot.autoconfigure.pulsar.PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>>
@Bean
PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>> containerCustomizer() {
return (containerFactory) -> containerFactory.getContainerProperties().transactions().setTimeout(Duration.ofSeconds(45));
}
用PulsarTransactionManager
它是 Spring Framework 的 .
你可以将 与普通的 Spring 事务支持(、 、 和其他)一起使用。PulsarTransactionManager
PlatformTransactionManager
PulsarTransactionManager
@Transactional
TransactionTemplate
如果事务处于活动状态,则在该事务范围内执行的任何操作都将登记并参与正在进行的事务。
管理器提交或回滚事务,具体取决于成功或失败。PulsarTemplate
您可能不需要直接使用,因为大多数事务性使用案例都包含在 和 中。PulsarTransactionManager PulsarTemplate @PulsarListener |
Pulsar 事务与其他事务管理器
仅限 Producer 事务
如果你想向 Pulsar 发送记录并在单个事务中执行一些数据库更新,你可以使用普通的 Spring 事务管理。DataSourceTransactionManager
以下示例假定有一个以名称 “dataSourceTransactionManager” 注册的 BeanDataSourceTransactionManager |
@Transactional("dataSourceTransactionManager")
public void myServiceMethod() {
var msg = calculateMessage();
this.pulsarTemplate.send("my-topic", msg);
this.jdbcTemplate.execute("insert into my_table (data) values ('%s')".formatted(msg));
}
Comments 的拦截器启动数据库事务,并将事务与 DB 事务管理器同步;每次发送都将参与该交易。
当方法退出时,数据库事务将提交,然后是 Pulsar 事务。@Transactional
PulsarTemplate
如果你希望先提交 Pulsar 事务,并且仅在 Pulsar 事务成功时提交 DB 事务,请使用嵌套方法,外部方法配置为使用 ,内部方法配置为使用 .@Transactional
DataSourceTransactionManager
PulsarTransactionManager
@Transactional("dataSourceTransactionManager")
public void myServiceMethod() {
var msg = calculateMessage();
this.jdbcTemplate.execute("insert into my_table (data) values ('%s')".formatted(msg));
this.sendToPulsar(msg);
}
@Transactional("pulsarTransactionManager")
public void sendToPulsar(String msg) {
this.pulsarTemplate.send("my-topic", msg);
}
Consumer + Producer 交易
如果你想使用 Pulsar 中的记录,将记录发送到 Pulsar,并在事务中执行一些数据库更新,你可以将正常的 Spring 事务管理(使用 )与容器发起的事务相结合。DataSourceTransactionManager
在下面的示例中,侦听器容器启动 Pulsar 事务,注解启动 DB 事务。
首先提交 DB 事务;如果 Pulsar 事务提交失败,则记录将被重新传递,因此 DB 更新应该是幂等的。@Transactional
@PulsarListener(topics = "my-input-topic")
@Transactional("dataSourceTransactionManager")
void listen(String msg) {
var transformedMsg = msg.toUpperCase(Locale.ROOT);
this.pulsarTemplate.send("my-output-topic", transformedMsg);
this.jdbcTemplate.execute("insert into my_table (data) values ('%s')".formatted(transformedMsg));
}