最新的稳定版请使用 Spring Data MongoDB 4.3.1! |
最新的稳定版请使用 Spring Data MongoDB 4.3.1! |
默认情况下,当客户端耗尽游标提供的所有结果时,MongoDB会自动关闭游标。 在耗尽时关闭游标会将流转换为有限流。对于有上限的集合, 您可以使用在客户端之后保持打开状态的 Tailable Cursor 消耗了所有最初返回的数据。
可以使用 创建上限集合。为此,请提供所需的 .MongoOperations.createCollection CollectionOptions.empty().capped()… |
可拖尾游标可以与命令式和反应式 MongoDB API 一起使用。强烈建议使用 反应性变体,因为它的资源密集度较低。但是,如果您不能使用响应式 API,您仍然可以使用消息传递 这个概念在Spring生态系统中已经很普遍。
可以使用 创建上限集合。为此,请提供所需的 .MongoOperations.createCollection CollectionOptions.empty().capped()… |
可拖尾游标MessageListener
使用同步驱动程序侦听有上限的集合会创建一个长时间运行的阻塞任务,需要委派给
一个单独的组件。在这种情况下,我们需要首先创建一个 ,这将是主要的入口点
用于运行特定的 .Spring Data MongoDB 已经附带了一个默认实现,该实现
在 上运行,并且能够创建和运行 的实例。MessageListenerContainer
SubscriptionRequest
MongoTemplate
Task
TailableCursorRequest
以下示例演示如何对实例使用可尾随游标:MessageListener
MessageListener
MessageListenerContainer container = new DefaultMessageListenerContainer(template);
container.start(); (1)
MessageListener<Document, User> listener = System.out::println; (2)
TailableCursorRequest request = TailableCursorRequest.builder()
.collection("orders") (3)
.filter(query(where("value").lt(100))) (4)
.publishTo(listener) (5)
.build();
container.register(request, User.class); (6)
// ...
container.stop(); (7)
1 | 启动容器初始化资源,并启动已注册实例的实例。启动后添加的请求将立即运行。Task SubscriptionRequest |
2 | 定义接收到时调用的侦听器。将转换为请求的域类型。用于接收原始结果而不进行转换。Message Message#getBody() Document |
3 | 设置要收听的集合。 |
4 | 为要接收的文档提供可选筛选器。 |
5 | 设置要将传入消息发布到的消息侦听器。Message |
6 | 注册请求。返回的可用于检查当前状态并将其取消以释放资源。Subscription Task |
7 | 一旦确定不再需要容器,请不要忘记停止容器。这样做会停止容器中所有正在运行的实例。Task |
1 | 启动容器初始化资源,并启动已注册实例的实例。启动后添加的请求将立即运行。Task SubscriptionRequest |
2 | 定义接收到时调用的侦听器。将转换为请求的域类型。用于接收原始结果而不进行转换。Message Message#getBody() Document |
3 | 设置要收听的集合。 |
4 | 为要接收的文档提供可选筛选器。 |
5 | 设置要将传入消息发布到的消息侦听器。Message |
6 | 注册请求。返回的可用于检查当前状态并将其取消以释放资源。Subscription Task |
7 | 一旦确定不再需要容器,请不要忘记停止容器。这样做会停止容器中所有正在运行的实例。Task |
反应式可拖尾游标
使用具有响应式数据类型的可拖尾游标可以构造无限流。可拖尾游标保持打开状态,直到它在外部关闭。当新文档到达有上限的集合时,它会发出数据。
如果查询不返回匹配项,或者游标在集合的“末尾”返回文档,然后应用程序删除该文档,则可跟踪游标可能会失效或无效。以下示例演示如何创建和使用无限流查询:
Flux<Person> stream = template.tail(query(where("name").is("Joe")), Person.class);
Disposable subscription = stream.doOnNext(person -> System.out.println(person)).subscribe();
// …
// Later: Dispose the subscription to close the stream
subscription.dispose();
Spring Data MongoDB 反应式存储库通过使用 .这适用于返回的方法和其他能够发射多个元素的反应式类型,如以下示例所示:@Tailable
Flux
public interface PersonRepository extends ReactiveMongoRepository<Person, String> {
@Tailable
Flux<Person> findByFirstname(String firstname);
}
Flux<Person> stream = repository.findByFirstname("Joe");
Disposable subscription = stream.doOnNext(System.out::println).subscribe();
// …
// Later: Dispose the subscription to close the stream
subscription.dispose();