最新的稳定版请使用 Spring Data MongoDB 4.3.1Spring中文文档

最新的稳定版请使用 Spring Data MongoDB 4.3.1Spring中文文档

从 MongoDB 3.6 开始,Change Streams 允许应用程序获得有关更改的通知,而无需尾随 oplog。Spring中文文档

更改流支持仅适用于副本集或分片集群。

Change Streams 可以同时使用命令式和反应式 MongoDB Java 驱动程序。强烈建议使用反应式变体,因为它的资源密集度较低。但是,如果不能使用响应式 API,您仍然可以使用 Spring 生态系统中已经流行的消息传递概念来获取更改事件。Spring中文文档

可以在集合级别和数据库级别上观看,而数据库级别变体发布 数据库中所有集合的更改。订阅数据库更改流时,请确保使用 适合事件类型的类型,因为转换可能无法在不同的实体类型中正确应用。 如有疑问,请使用 .DocumentSpring中文文档

更改流支持仅适用于副本集或分片集群。

更改流MessageListener

使用同步驱动程序侦听更改流会创建一个长时间运行的阻塞任务,该任务需要委派给单独的组件。 在这种情况下,我们需要首先创建一个 ,它将是运行特定任务的主要入口点。 Spring Data MongoDB 已经附带了一个默认实现,该实现可以在 上运行并能够创建和运行 .MessageListenerContainerSubscriptionRequestMongoTemplateTaskChangeStreamRequestSpring中文文档

以下示例演示如何将 Change Streams 用于实例:MessageListenerSpring中文文档

例 1.使用实例更改流MessageListener
MessageListenerContainer container = new DefaultMessageListenerContainer(template);
container.start();                                                                                              (1)

MessageListener<ChangeStreamDocument<Document>, User> listener = System.out::println;                           (2)
ChangeStreamRequestOptions options = new ChangeStreamRequestOptions("db", "user", ChangeStreamOptions.empty()); (3)

Subscription subscription = container.register(new ChangeStreamRequest<>(listener, options), User.class);       (4)

// ...

container.stop();                                                                                               (5)
1 启动容器将初始化资源并启动已注册实例的实例。启动后添加的请求将立即运行。TaskSubscriptionRequest
2 定义接收到时调用的侦听器。将转换为请求的域类型。用于接收原始结果而不进行转换。MessageMessage#getBody()Document
3 设置要侦听的集合,并通过 提供其他选项。ChangeStreamOptions
4 注册请求。返回的可用于检查当前状态并将其取消以释放资源。SubscriptionTask
5 一旦确定不再需要容器,请不要忘记停止容器。这样做会停止容器中所有正在运行的实例。Task

处理时的错误将传递给 .如果未另行说明,则默认应用日志追加。
请用于提供附加功能。
org.springframework.util.ErrorHandlerErrorHandlerregister(request, body, errorHandler)Spring中文文档

1 启动容器将初始化资源并启动已注册实例的实例。启动后添加的请求将立即运行。TaskSubscriptionRequest
2 定义接收到时调用的侦听器。将转换为请求的域类型。用于接收原始结果而不进行转换。MessageMessage#getBody()Document
3 设置要侦听的集合,并通过 提供其他选项。ChangeStreamOptions
4 注册请求。返回的可用于检查当前状态并将其取消以释放资源。SubscriptionTask
5 一旦确定不再需要容器,请不要忘记停止容器。这样做会停止容器中所有正在运行的实例。Task

处理时的错误将传递给 .如果未另行说明,则默认应用日志追加。
请用于提供附加功能。
org.springframework.util.ErrorHandlerErrorHandlerregister(request, body, errorHandler)Spring中文文档

反应式变更流

使用响应式 API 订阅更改流是一种更自然的流处理方法。尽管如此,基本构建块(例如 )保持不变。以下示例演示如何使用发出 s 的更改流:ChangeStreamOptionsChangeStreamEventSpring中文文档

例 2.更改流的发出ChangeStreamEvent
Flux<ChangeStreamEvent<User>> flux = reactiveTemplate.changeStream(User.class) (1)
    .watchCollection("people")
    .filter(where("age").gte(38))                                              (2)
    .listen();                                                                 (3)
1 基础文档应转换为的事件目标类型。省略此项以接收原始结果而不进行转换。
2 使用聚合管道或仅使用查询来筛选事件。Criteria
3 获取变更流事件。从 (2) 转换为请求的域类型。FluxChangeStreamEvent#getBody()
1 基础文档应转换为的事件目标类型。省略此项以接收原始结果而不进行转换。
2 使用聚合管道或仅使用查询来筛选事件。Criteria
3 获取变更流事件。从 (2) 转换为请求的域类型。FluxChangeStreamEvent#getBody()

恢复更改流

更改流可以恢复,并在您离开的地方继续发出事件。要恢复流,您需要提供简历 令牌或最后已知的服务器时间(以 UTC 为单位)。用于相应地设置值。ChangeStreamOptionsSpring中文文档

以下示例演示如何使用服务器时间设置恢复偏移量:Spring中文文档

例 3.恢复更改流
Flux<ChangeStreamEvent<User>> resumed = template.changeStream(User.class)
    .watchCollection("people")
    .resumeAt(Instant.now().minusSeconds(1)) (1)
    .listen();
1 您可以通过 through 方法获取服务器时间,也可以使用公开的 through 方法。ChangeStreamEventgetTimestampresumeTokengetResumeToken
在某些情况下,在恢复更改流时,an 可能不是足够精确的度量值。为此,请使用 MongoDB 本机 BsonTimestampInstant
1 您可以通过 through 方法获取服务器时间,也可以使用公开的 through 方法。ChangeStreamEventgetTimestampresumeTokengetResumeToken
在某些情况下,在恢复更改流时,an 可能不是足够精确的度量值。为此,请使用 MongoDB 本机 BsonTimestampInstant