从版本 3.6 开始,MongoDB 支持会话的概念。
会话的使用启用了MongoDB的因果一致性模型,该模型保证了以尊重其因果关系的顺序运行操作。
这些被拆分为实例和实例。
在本节中,当我们谈到会话时,我们指的是 。ServerSession
ClientSession
ClientSession
客户端会话中的操作不会与会话外的操作隔离。 |
两者都提供了将 a 绑定到操作的网关方法。 并使用实现 MongoDB 的集合和数据库接口的会话代理对象,因此您无需在每次调用时添加会话。
这意味着对的潜在调用被委托给 。MongoOperations
ReactiveMongoOperations
ClientSession
MongoCollection
MongoDatabase
MongoCollection#find()
MongoCollection#find(ClientSession)
诸如返回本机 MongoDB Java 驱动程序网关对象(例如 )的方法,这些对象本身为 提供专用方法。
这些方法不是会话代理的。
当直接与 or 交互时,您应该提供所需的位置,而不是通过 上的回调之一。(Reactive)MongoOperations#getCollection MongoCollection ClientSession ClientSession MongoCollection MongoDatabase #execute MongoOperations |
客户端会话中的操作不会与会话外的操作隔离。 |
诸如返回本机 MongoDB Java 驱动程序网关对象(例如 )的方法,这些对象本身为 提供专用方法。
这些方法不是会话代理的。
当直接与 or 交互时,您应该提供所需的位置,而不是通过 上的回调之一。(Reactive)MongoOperations#getCollection MongoCollection ClientSession ClientSession MongoCollection MongoDatabase #execute MongoOperations |
ClientSession 支持
以下示例显示了会话的用法:
-
Imperative
-
Reactive
ClientSessionOptions sessionOptions = ClientSessionOptions.builder()
.causallyConsistent(true)
.build();
ClientSession session = client.startSession(sessionOptions); (1)
template.withSession(() -> session)
.execute(action -> {
Query query = query(where("name").is("Durzo Blint"));
Person durzo = action.findOne(query, Person.class); (2)
Person azoth = new Person("Kylar Stern");
azoth.setMaster(durzo);
action.insert(azoth); (3)
return azoth;
});
session.close() (4)
1 | 从服务器获取新会话。 |
2 | 像以前一样使用方法。
将自动应用。MongoOperation ClientSession |
3 | 确保关闭 .ClientSession |
4 | 关闭会话。 |
在处理实例时,尤其是延迟加载的实例,在加载所有数据之前不要关闭实例。
否则,延迟提取将失败。DBRef ClientSession |
ClientSessionOptions sessionOptions = ClientSessionOptions.builder()
.causallyConsistent(true)
.build();
Publisher<ClientSession> session = client.startSession(sessionOptions); (1)
template.withSession(session)
.execute(action -> {
Query query = query(where("name").is("Durzo Blint"));
return action.findOne(query, Person.class)
.flatMap(durzo -> {
Person azoth = new Person("Kylar Stern");
azoth.setMaster(durzo);
return action.insert(azoth); (2)
});
}, ClientSession::close) (3)
.subscribe(); (4)
1 | 获取新会话检索。Publisher |
2 | 像以前一样使用方法。
自动获取并应用。ReactiveMongoOperation ClientSession |
3 | 确保关闭 .ClientSession |
4 | 在您订阅之前,什么都不会发生。 有关详细信息,请参阅《项目反应堆参考指南》。 |
通过使用提供实际会话的 a,可以将会话获取推迟到实际订阅点。
不过,您需要在完成后关闭会话,以免因过时的会话而污染服务器。
当您不再需要会话时,使用挂钩进行呼叫。
如果您希望对会话本身进行更多控制,则可以通过驱动程序获取 ,并通过 .Publisher
doFinally
execute
ClientSession#close()
ClientSession
Supplier
的反应性使用仅限于模板 API 的使用。
目前没有与反应式存储库的会话集成。ClientSession |
1 | 从服务器获取新会话。 |
2 | 像以前一样使用方法。
将自动应用。MongoOperation ClientSession |
3 | 确保关闭 .ClientSession |
4 | 关闭会话。 |
在处理实例时,尤其是延迟加载的实例,在加载所有数据之前不要关闭实例。
否则,延迟提取将失败。DBRef ClientSession |
1 | 获取新会话检索。Publisher |
2 | 像以前一样使用方法。
自动获取并应用。ReactiveMongoOperation ClientSession |
3 | 确保关闭 .ClientSession |
4 | 在您订阅之前,什么都不会发生。 有关详细信息,请参阅《项目反应堆参考指南》。 |
的反应性使用仅限于模板 API 的使用。
目前没有与反应式存储库的会话集成。ClientSession |
MongoDB 事务
除非在应用程序上下文中指定 a,否则事务支持处于 DISABLED 状态。
您可以使用它来参与正在进行的非本机 MongoDB 事务。MongoTransactionManager setSessionSynchronization(ALWAYS) |
若要完全以编程方式控制事务,可能需要使用 上的会话回调。MongoOperations
下面的示例演示编程事务控制:
-
Imperative
-
Reactive
ClientSession session = client.startSession(options); (1)
template.withSession(session)
.execute(action -> {
session.startTransaction(); (2)
try {
Step step = // ...;
action.insert(step);
process(step);
action.update(Step.class).apply(Update.set("state", // ...
session.commitTransaction(); (3)
} catch (RuntimeException e) {
session.abortTransaction(); (4)
}
}, ClientSession::close) (5)
1 | 获取新的 .ClientSession |
2 | 开始交易。 |
3 | 如果一切按预期进行,请提交更改。 |
4 | 有些东西坏了,所以回滚所有东西。 |
5 | 完成后不要忘记关闭会话。 |
通过前面的示例,您可以在回调中使用会话范围的实例时完全控制事务行为,以确保会话传递到每个服务器调用。
为了避免此方法带来的一些开销,您可以使用 a 来消除手动事务流的一些噪音。MongoOperations
TransactionTemplate
Mono<DeleteResult> result = Mono
.from(client.startSession()) (1)
.flatMap(session -> {
session.startTransaction(); (2)
return Mono.from(collection.deleteMany(session, ...)) (3)
.onErrorResume(e -> Mono.from(session.abortTransaction()).then(Mono.error(e))) (4)
.flatMap(val -> Mono.from(session.commitTransaction()).then(Mono.just(val))) (5)
.doFinally(signal -> session.close()); (6)
});
1 | 首先,我们显然需要启动会议。 |
2 | 一旦我们手头有钱,就开始交易。ClientSession |
3 | 通过将 传递给操作,在事务中操作。ClientSession |
4 | 如果操作异常完成,我们需要停止事务并保留错误。 |
5 | 或者,当然,如果成功,则提交更改。 仍然保留操作结果。 |
6 | 最后,我们需要确保关闭会话。 |
上述操作的罪魁祸首是保留主流程,而不是通过 或 发布事务结果,这导致了相当复杂的设置。DeleteResult
commitTransaction()
abortTransaction()
除非在应用程序上下文中指定 a,否则事务支持处于 DISABLED 状态。
您可以使用它来参与正在进行的非本机 MongoDB 事务。ReactiveMongoTransactionManager setSessionSynchronization(ALWAYS) |
除非在应用程序上下文中指定 a,否则事务支持处于 DISABLED 状态。
您可以使用它来参与正在进行的非本机 MongoDB 事务。MongoTransactionManager setSessionSynchronization(ALWAYS) |
1 | 获取新的 .ClientSession |
2 | 开始交易。 |
3 | 如果一切按预期进行,请提交更改。 |
4 | 有些东西坏了,所以回滚所有东西。 |
5 | 完成后不要忘记关闭会话。 |
1 | 首先,我们显然需要启动会议。 |
2 | 一旦我们手头有钱,就开始交易。ClientSession |
3 | 通过将 传递给操作,在事务中操作。ClientSession |
4 | 如果操作异常完成,我们需要停止事务并保留错误。 |
5 | 或者,当然,如果成功,则提交更改。 仍然保留操作结果。 |
6 | 最后,我们需要确保关闭会话。 |
除非在应用程序上下文中指定 a,否则事务支持处于 DISABLED 状态。
您可以使用它来参与正在进行的非本机 MongoDB 事务。ReactiveMongoTransactionManager setSessionSynchronization(ALWAYS) |
使用 TransactionTemplate / TransactionalOperator 进行事务处理
Spring Data MongoDB 事务同时支持 和 。TransactionTemplate
TransactionalOperator
TransactionTemplate
/ TransactionalOperator
-
Imperative
-
Reactive
template.setSessionSynchronization(ALWAYS); (1)
// ...
TransactionTemplate txTemplate = new TransactionTemplate(anyTxManager); (2)
txTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) { (3)
Step step = // ...;
template.insert(step);
process(step);
template.update(Step.class).apply(Update.set("state", // ...
}
});
1 | 在模板 API 配置期间启用事务同步。 |
2 | 使用提供的 .TransactionTemplate PlatformTransactionManager |
3 | 在回调中,已经注册了 和 事务。ClientSession |
更改运行时期间的状态(如上一列表的第 1 项中可能认为的那样)可能会导致线程和可见性问题。MongoTemplate |
template.setSessionSynchronization(ALWAYS); (1)
// ...
TransactionalOperator rxtx = TransactionalOperator.create(anyTxManager,
new DefaultTransactionDefinition()); (2)
Step step = // ...;
template.insert(step);
Mono<Void> process(step)
.then(template.update(Step.class).apply(Update.set("state", …))
.as(rxtx::transactional) (3)
.then();
1 | 为事务参与启用事务同步。 |
2 | 使用提供的 .TransactionalOperator ReactiveTransactionManager |
3 | TransactionalOperator.transactional(…) 为所有上游操作提供事务管理。 |
1 | 在模板 API 配置期间启用事务同步。 |
2 | 使用提供的 .TransactionTemplate PlatformTransactionManager |
3 | 在回调中,已经注册了 和 事务。ClientSession |
更改运行时期间的状态(如上一列表的第 1 项中可能认为的那样)可能会导致线程和可见性问题。MongoTemplate |
1 | 为事务参与启用事务同步。 |
2 | 使用提供的 .TransactionalOperator ReactiveTransactionManager |
3 | TransactionalOperator.transactional(…) 为所有上游操作提供事务管理。 |
与 MongoTransactionManager 和 ReactiveMongoTransactionManager 的交易
MongoTransactionManager
/ ReactiveMongoTransactionManager
是通向众所周知的 Spring 事务支持的门户。
它允许应用程序使用 Spring 的托管事务功能。
将 a 绑定到线程,而 将 用于此。 检测会话,并相应地操作与事务关联的这些资源。 还可以参与其他正在进行的交易。
以下示例演示如何创建和使用具有 :MongoTransactionManager
ClientSession
ReactiveMongoTransactionManager
ReactorContext
MongoTemplate
MongoTemplate
MongoTransactionManager
MongoTransactionManager
/ ReactiveMongoTransactionManager
-
Imperative
-
Reactive
@Configuration
static class Config extends AbstractMongoClientConfiguration {
@Bean
MongoTransactionManager transactionManager(MongoDatabaseFactory dbFactory) { (1)
return new MongoTransactionManager(dbFactory);
}
// ...
}
@Component
public class StateService {
@Transactional
void someBusinessFunction(Step step) { (2)
template.insert(step);
process(step);
template.update(Step.class).apply(Update.set("state", // ...
};
});
1 | 在应用程序上下文中注册。MongoTransactionManager |
2 | 将方法标记为事务性方法。 |
@Transactional(readOnly = true) 建议还启动一个事务,将 添加到传出请求中。MongoTransactionManager ClientSession |
@Configuration
public class Config extends AbstractReactiveMongoConfiguration {
@Bean
ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory factory) { (1)
return new ReactiveMongoTransactionManager(factory);
}
// ...
}
@Service
public class StateService {
@Transactional
Mono<UpdateResult> someBusinessFunction(Step step) { (2)
return template.insert(step)
.then(process(step))
.then(template.update(Step.class).apply(Update.set("state", …));
};
});
1 | 在应用程序上下文中注册。ReactiveMongoTransactionManager |
2 | 将方法标记为事务性方法。 |
@Transactional(readOnly = true) 建议还启动一个事务,将 添加到传出请求中。ReactiveMongoTransactionManager ClientSession |
控制特定于 MongoDB 的事务选项
事务服务方法可能需要特定的事务选项来运行事务。
Spring Data MongoDB的事务管理器支持对事务标签的评估,例如。@Transactional(label = { "mongo:readConcern=available" })
默认情况下,使用前缀的标签命名空间由默认配置的命名空间进行计算。
事务标签由 和 提供并可用于编程事务控制。
由于它们的声明性质,提供了一个很好的起点,也可以作为文档。mongo:
MongoTransactionOptionsResolver
TransactionAttribute
TransactionTemplate
TransactionalOperator
@Transactional(label = …)
目前,支持以下选项:
- 最大提交时间
-
控制 commitTransaction 操作在服务器上的最大执行时间。 该值的格式与 中使用的 ISO-8601 持续时间格式相对应。
Duration.parse(…)
用法:
mongo:maxCommitTime=PT1S
- 阅读关注
-
设置事务的读取关注点。
用法:
mongo:readConcern=LOCAL|MAJORITY|LINEARIZABLE|SNAPSHOT|AVAILABLE
- 读取首选项
-
设置事务的读取首选项。
用法:
mongo:readPreference=PRIMARY|SECONDARY|SECONDARY_PREFERRED|PRIMARY_PREFERRED|NEAREST
- 写关注点
-
设置事务的写入关注点。
用法:
mongo:writeConcern=ACKNOWLEDGED|W1|W2|W3|UNACKNOWLEDGED|JOURNALED|MAJORITY
联接外部事务的嵌套事务不会影响初始事务选项,因为事务已启动。 仅当启动新事务时,才应用事务选项。 |
1 | 在应用程序上下文中注册。MongoTransactionManager |
2 | 将方法标记为事务性方法。 |
@Transactional(readOnly = true) 建议还启动一个事务,将 添加到传出请求中。MongoTransactionManager ClientSession |
1 | 在应用程序上下文中注册。ReactiveMongoTransactionManager |
2 | 将方法标记为事务性方法。 |
@Transactional(readOnly = true) 建议还启动一个事务,将 添加到传出请求中。ReactiveMongoTransactionManager ClientSession |
联接外部事务的嵌套事务不会影响初始事务选项,因为事务已启动。 仅当启动新事务时,才应用事务选项。 |
事务中的特殊行为
在事务内部,MongoDB服务器的行为略有不同。
连接设置
MongoDB驱动程序提供专用的副本集名称配置选项,使驱动程序进入自动检测模式。 此选项有助于在事务期间识别主副本集节点和命令路由。
确保添加到 MongoDB URI。
有关详细信息,请参阅连接字符串选项。replicaSet |
收集操作
MongoDB 不支持事务中的收集操作,例如集合创建。 这也会影响首次使用时发生的动态集合创建。 因此,请确保所有必需的结构都已到位。
瞬态错误
MongoDB可以为事务操作期间引发的错误添加特殊标签。
这些可能表示暂时性故障,只需重试操作即可消失。
出于这些目的,我们强烈建议 Spring 重试。
但是,可以覆盖以实现MongoDB参考手册中概述的重试提交操作行为。MongoTransactionManager#doCommit(MongoTransactionObject)
计数
MongoDB根据收集统计信息进行操作,这些统计信息可能无法反映事务中的实际情况。
在多文档事务中发出命令时,服务器响应错误 50851。
一旦检测到活动事务,所有公开的方法都会使用 和 运算符进行转换并委托给聚合框架,同时保留设置,例如 。count
count
MongoTemplate
count()
$match
$count
Query
collation
在聚合计数帮助程序中使用 geo 命令时,存在限制。 不能使用以下运算符,必须将其替换为其他运算符:
-
$where
→$expr
-
$near
→$geoWithin
$center
-
$nearSphere
→$geoWithin
$centerSphere
使用 和 的查询必须重写为相应的 .
这同样适用于必须更改为 的存储库查询方法中的 query 关键字。
另请参阅 MongoDB JIRA 票证 DRIVERS-518 以获取进一步参考。Criteria.near(…)
Criteria.nearSphere(…)
Criteria.within(…)
Criteria.withinSphere(…)
near
within
以下代码片段显示了会话绑定闭包中的用法:count
session.startTransaction();
template.withSession(session)
.execute(action -> {
action.count(query(where("state").is("active")), Step.class)
...
上面的代码片段在以下命令中实现:
db.collection.aggregate(
[
{ $match: { state: "active" } },
{ $count: "totalEntityCount" }
]
)
而不是:
db.collection.find( { state: "active" } ).count()
确保添加到 MongoDB URI。
有关详细信息,请参阅连接字符串选项。replicaSet |