最新的稳定版请使用 Spring Data MongoDB 4.3.1! |
最新的稳定版请使用 Spring Data MongoDB 4.3.1! |
从 MongoDB 3.6 开始,Change Streams 允许应用程序获得有关更改的通知,而无需尾随 oplog。
更改流支持仅适用于副本集或分片集群。 |
Change Streams 可以同时使用命令式和反应式 MongoDB Java 驱动程序。强烈建议使用反应式变体,因为它的资源密集度较低。但是,如果不能使用响应式 API,您仍然可以使用 Spring 生态系统中已经流行的消息传递概念来获取更改事件。
可以在集合级别和数据库级别上观看,而数据库级别变体发布
数据库中所有集合的更改。订阅数据库更改流时,请确保使用
适合事件类型的类型,因为转换可能无法在不同的实体类型中正确应用。
如有疑问,请使用 .Document
更改流支持仅适用于副本集或分片集群。 |
更改流MessageListener
使用同步驱动程序侦听更改流会创建一个长时间运行的阻塞任务,该任务需要委派给单独的组件。
在这种情况下,我们需要首先创建一个 ,它将是运行特定任务的主要入口点。
Spring Data MongoDB 已经附带了一个默认实现,该实现可以在 上运行并能够创建和运行 .MessageListenerContainer
SubscriptionRequest
MongoTemplate
Task
ChangeStreamRequest
以下示例演示如何将 Change Streams 用于实例:MessageListener
MessageListener
MessageListenerContainer container = new DefaultMessageListenerContainer(template);
container.start(); (1)
MessageListener<ChangeStreamDocument<Document>, User> listener = System.out::println; (2)
ChangeStreamRequestOptions options = new ChangeStreamRequestOptions("db", "user", ChangeStreamOptions.empty()); (3)
Subscription subscription = container.register(new ChangeStreamRequest<>(listener, options), User.class); (4)
// ...
container.stop(); (5)
1 | 启动容器将初始化资源并启动已注册实例的实例。启动后添加的请求将立即运行。Task SubscriptionRequest |
2 | 定义接收到时调用的侦听器。将转换为请求的域类型。用于接收原始结果而不进行转换。Message Message#getBody() Document |
3 | 设置要侦听的集合,并通过 提供其他选项。ChangeStreamOptions |
4 | 注册请求。返回的可用于检查当前状态并将其取消以释放资源。Subscription Task |
5 | 一旦确定不再需要容器,请不要忘记停止容器。这样做会停止容器中所有正在运行的实例。Task |
处理时的错误将传递给 .如果未另行说明,则默认应用日志追加。 |
1 | 启动容器将初始化资源并启动已注册实例的实例。启动后添加的请求将立即运行。Task SubscriptionRequest |
2 | 定义接收到时调用的侦听器。将转换为请求的域类型。用于接收原始结果而不进行转换。Message Message#getBody() Document |
3 | 设置要侦听的集合,并通过 提供其他选项。ChangeStreamOptions |
4 | 注册请求。返回的可用于检查当前状态并将其取消以释放资源。Subscription Task |
5 | 一旦确定不再需要容器,请不要忘记停止容器。这样做会停止容器中所有正在运行的实例。Task |
处理时的错误将传递给 .如果未另行说明,则默认应用日志追加。 |
反应式变更流
使用响应式 API 订阅更改流是一种更自然的流处理方法。尽管如此,基本构建块(例如 )保持不变。以下示例演示如何使用发出 s 的更改流:ChangeStreamOptions
ChangeStreamEvent
ChangeStreamEvent
Flux<ChangeStreamEvent<User>> flux = reactiveTemplate.changeStream(User.class) (1)
.watchCollection("people")
.filter(where("age").gte(38)) (2)
.listen(); (3)
1 | 基础文档应转换为的事件目标类型。省略此项以接收原始结果而不进行转换。 |
2 | 使用聚合管道或仅使用查询来筛选事件。Criteria |
3 | 获取变更流事件。从 (2) 转换为请求的域类型。Flux ChangeStreamEvent#getBody() |
1 | 基础文档应转换为的事件目标类型。省略此项以接收原始结果而不进行转换。 |
2 | 使用聚合管道或仅使用查询来筛选事件。Criteria |
3 | 获取变更流事件。从 (2) 转换为请求的域类型。Flux ChangeStreamEvent#getBody() |
恢复更改流
更改流可以恢复,并在您离开的地方继续发出事件。要恢复流,您需要提供简历
令牌或最后已知的服务器时间(以 UTC 为单位)。用于相应地设置值。ChangeStreamOptions
以下示例演示如何使用服务器时间设置恢复偏移量:
Flux<ChangeStreamEvent<User>> resumed = template.changeStream(User.class)
.watchCollection("people")
.resumeAt(Instant.now().minusSeconds(1)) (1)
.listen();
1 | 您可以通过 through 方法获取服务器时间,也可以使用公开的 through 方法。ChangeStreamEvent getTimestamp resumeToken getResumeToken |
在某些情况下,在恢复更改流时,an 可能不是足够精确的度量值。为此,请使用 MongoDB 本机 BsonTimestamp。Instant |
1 | 您可以通过 through 方法获取服务器时间,也可以使用公开的 through 方法。ChangeStreamEvent getTimestamp resumeToken getResumeToken |
在某些情况下,在恢复更改流时,an 可能不是足够精确的度量值。为此,请使用 MongoDB 本机 BsonTimestamp。Instant |