更改流
从 MongoDB 3.6 开始,Change Streams 允许应用程序获得有关更改的通知,而无需跟踪 oplog。
Change Stream 支持仅适用于副本集或分片集群。 |
Change Streams 可以与命令式和反应式 MongoDB Java 驱动程序一起使用。强烈建议使用 reactive 变体,因为它的资源密集度较低。但是,如果您无法使用反应式 API,您仍然可以通过使用 Spring 生态系统中已经流行的消息传递概念来获取更改事件。
可以在集合和数据库级别上同时进行监视,而数据库级别的变体发布
数据库中所有集合的更改。订阅数据库更改流时,请确保使用
适合事件类型的类型,因为 conversion 可能无法正确应用于不同的实体类型。
如有疑问,请使用 .Document
更改 Streams withMessageListener
使用 Sync Driver 侦听 Change Stream 会创建一个长时间运行的阻塞任务,该任务需要委派给单独的组件。
在这种情况下,我们需要首先创建一个 MessageListenerContainer
,它将是运行特定任务的主入口点。
Spring Data MongoDB 已经附带了一个默认实现,该实现在ChangeStreamRequest
上运行并能够创建和运行实例。SubscriptionRequest
MongoTemplate
Task
以下示例显示如何将 Change Streams 与实例一起使用:MessageListener
MessageListener
MessageListenerContainer container = new DefaultMessageListenerContainer(template);
container.start(); (1)
MessageListener<ChangeStreamDocument<Document>, User> listener = System.out::println; (2)
ChangeStreamRequestOptions options = new ChangeStreamRequestOptions("db", "user", ChangeStreamOptions.empty()); (3)
Subscription subscription = container.register(new ChangeStreamRequest<>(listener, options), User.class); (4)
// ...
container.stop(); (5)
1 | 启动容器将初始化资源并启动已注册实例的实例。启动后添加的请求将立即运行。Task SubscriptionRequest |
2 | 定义收到 a 时调用的侦听器。将转换为请求的域类型。用于接收不进行转换的原始结果。Message Message#getBody() Document |
3 | 设置要侦听的集合,并通过 .ChangeStreamOptions |
4 | 注册请求。返回的可用于检查当前状态并取消它以释放资源。Subscription Task |
5 | 一旦您确定不再需要容器,请不要忘记停止容器。这样做会停止容器内所有正在运行的实例。Task |
处理过程中的错误会传递到 .如果未另有说明,则默认应用日志附加。 |
反应式变更流
使用反应式 API 订阅 Change Streams 是一种更自然的流处理方法。尽管如此,基本的构建块(例如 )保持不变。以下示例显示如何使用发出 s 的 Change Streams:ChangeStreamOptions
ChangeStreamEvent
ChangeStreamEvent
Flux<ChangeStreamEvent<User>> flux = reactiveTemplate.changeStream(User.class) (1)
.watchCollection("people")
.filter(where("age").gte(38)) (2)
.listen(); (3)
1 | 基础文档应转换为的事件目标类型。省略此项以接收不进行转换的原始结果。 |
2 | 使用聚合管道或仅使用查询来筛选事件。Criteria |
3 | 获取 a of change stream 事件。将从 (2) 转换为请求的域类型。Flux ChangeStreamEvent#getBody() |
恢复更改流
可以恢复 Change Streams 并从您离开的位置继续发出事件。要恢复流,您需要提供 resume
token 或最后已知的服务器时间 (UTC) 。使用 ChangeStreamOptions
相应地设置值。
以下示例显示如何使用服务器时间设置恢复偏移量:
Flux<ChangeStreamEvent<User>> resumed = template.changeStream(User.class)
.watchCollection("people")
.resumeAt(Instant.now().minusSeconds(1)) (1)
.listen();
1 | 您可以通过该方法获取 an 的服务器时间,也可以使用 exposed 的 through 。ChangeStreamEvent getTimestamp resumeToken getResumeToken |
在某些情况下,在恢复 Change Stream 时,an 可能不是足够精确的度量。为此,请使用 MongoDB 原生 BsonTimestamp。Instant |