对于最新的稳定版本,请使用 Spring Integration 6.3.4spring-doc.cn

对于最新的稳定版本,请使用 Spring Integration 6.3.4spring-doc.cn

2.1 版本引入了对 MongoDB 的支持:MongoDB 是一个“高性能、开源、面向文档的数据库”。spring-doc.cn

您需要将此依赖项包含在您的项目中:spring-doc.cn

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mongodb</artifactId>
    <version>6.2.9</version>
</dependency>
compile "org.springframework.integration:spring-integration-mongodb:6.2.9"

要下载、安装和运行 MongoDB,请参阅 MongoDB 文档spring-doc.cn

连接到 MongoDb

[[阻塞还是反应式? === 阻塞还是反应?spring-doc.cn

从版本 5.3 开始, Spring 集成提供了对反应式 MongoDB 驱动程序的支持,以便在访问 MongoDB 时启用非阻塞 I/O。 要启用反应式支持,请将 MongoDB 反应式流驱动程序添加到您的依赖项中:spring-doc.cn

<dependency>
    <groupId>org.mongodb</groupId>
    <artifactId>mongodb-driver-reactivestreams</artifactId>
</dependency>
compile "org.mongodb:mongodb-driver-reactivestreams"

对于常规同步客户端,您需要将其相应的驱动程序添加到依赖项中:spring-doc.cn

<dependency>
    <groupId>org.mongodb</groupId>
    <artifactId>mongodb-driver-sync</artifactId>
</dependency>
compile "org.mongodb:mongodb-driver-sync"

它们都位于更好的最终用户选择支持的框架中。optionalspring-doc.cn

要开始与 MongoDB 交互,您首先需要连接到它。 Spring 集成建立在另一个 Spring 项目 Spring Data MongoDB 提供的支持之上。 它提供名为 和 的工厂类,可简化与 MongoDB Client API 的集成。MongoDatabaseFactoryReactiveMongoDatabaseFactoryspring-doc.cn

Spring Data 默认提供阻塞 MongoDB 驱动程序,但您可以通过包含上述依赖项来选择反应式使用。

MongoDatabaseFactory

要连接到 MongoDB,您可以使用接口的实现。MongoDatabaseFactoryspring-doc.cn

以下示例演示如何使用:SimpleMongoClientDatabaseFactoryspring-doc.cn

MongoDatabaseFactory mongoDbFactory =
        new SimpleMongoClientDatabaseFactory(com.mongodb.client.MongoClients.create(), "test");
<bean id="mongoDbFactory" class="o.s.data.mongodb.core.SimpleMongoClientDatabaseFactory">
    <constructor-arg>
        <bean class="com.mongodb.client.MongoClients" factory-method="create"/>
    </constructor-arg>
    <constructor-arg value="test"/>
</bean>

SimpleMongoClientDatabaseFactory采用两个参数:一个 instance 和一个指定数据库名称的 a。 如果需要配置属性(如 , 和 ),可以使用基础类提供的构造函数之一来传递这些属性。 有关如何配置 MongoDB 的更多信息,请参阅 Spring-Data-MongoDB 参考。MongoClientStringhostportMongoClientsspring-doc.cn

ReactiveMongoDatabaseFactory

要使用反应式驱动程序连接到 MongoDB,您可以使用接口的实现。ReactiveMongoDatabaseFactoryspring-doc.cn

以下示例演示如何使用:SimpleReactiveMongoDatabaseFactoryspring-doc.cn

ReactiveMongoDatabaseFactory mongoDbFactory =
        new SimpleReactiveMongoDatabaseFactory(com.mongodb.reactivestreams.client.MongoClients.create(), "test");
<bean id="mongoDbFactory" class="o.s.data.mongodb.core.SimpleReactiveMongoDatabaseFactory">
    <constructor-arg>
        <bean class="com.mongodb.reactivestreams.client.MongoClients" factory-method="create"/>
    </constructor-arg>
    <constructor-arg value="test"/>
</bean>
Spring Data 默认提供阻塞 MongoDB 驱动程序,但您可以通过包含上述依赖项来选择反应式使用。

MongoDB 消息存储

Enterprise Integration Patterns (EIP) 一书中所述,Message Store 允许您保留消息。 如果可靠性是一个问题,那么在处理能够缓冲消息的组件(、、、 和其他组件)时,这样做可能很有用。 在 Spring 集成中,该策略还为索赔检查模式提供了基础,EIP 中也对此进行了描述。QueueChannelaggregatorresequencerMessageStorespring-doc.cn

Spring 集成的 MongoDB 模块提供了 ,它是策略(主要由声明检查模式使用)和策略(主要由 aggregator 和 resequencer 模式使用)的实现。MongoDbMessageStoreMessageStoreMessageGroupStorespring-doc.cn

以下示例将 a 配置为使用 a 和 an :MongoDbMessageStoreQueueChannelaggregatorspring-doc.cn

<bean id="mongoDbMessageStore" class="o.s.i.mongodb.store.MongoDbMessageStore">
    <constructor-arg ref="mongoDbFactory"/>
</bean>

<int:channel id="somePersistentQueueChannel">
    <int:queue message-store="mongoDbMessageStore"/>
<int:channel>

<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
         message-store="mongoDbMessageStore"/>

前面的示例是一个简单的 bean 配置,它需要 a 作为构造函数参数。MongoDbFactoryspring-doc.cn

通过使用 Spring Data Mongo 映射机制,将 扩展为 Mongo 文档,其中包含所有嵌套属性。 当您需要访问 或 进行审计或分析时(例如,针对存储的消息),它非常有用。MongoDbMessageStoreMessagepayloadheadersspring-doc.cn

它使用自定义实现将实例存储为 MongoDB 文档,并且 的属性(和值)有一些限制。MongoDbMessageStoreMappingMongoConverterMessagepayloadheaderMessage

从版本 5.1.6 开始,可以使用传播到内部实现中的自定义转换器进行配置。 有关更多信息,请参阅 JavaDocs。MongoDbMessageStoreMappingMongoConverterMongoDbMessageStore.setCustomConverters(Object…​ customConverters)spring-doc.cn

Spring Integration 3.0 引入了 . 它实现了 和 接口。 此类可以接收 作为构造函数参数的 ,例如,您可以使用该参数配置自定义 . 另一个构造函数需要 a 和 a ,它允许您为实例及其属性提供一些自定义转换。 请注意,默认情况下,它使用标准 Java 序列化向 MongoDB 写入和读取实例(请参阅 ),并依赖于 中的其他属性的默认值。 它从提供的 和 . 存储的集合的默认名称为 。 我们建议在消息包含复杂数据类型时使用此实现来创建强大而灵活的解决方案。ConfigurableMongoDbMessageStoreMessageStoreMessageGroupStoreMongoTemplateWriteConcernMappingMongoConverterMongoDbFactoryMessageConfigurableMongoDbMessageStoreMessageMongoDbMessageBytesConverterMongoTemplateMongoTemplateMongoDbFactoryMappingMongoConverterConfigurableMongoDbMessageStoreconfigurableStoreMessagesspring-doc.cn

从版本 6.0.8 开始,提供了一个 (defaults to ) 选项,该选项可用于禁用自动索引创建。 下面的示例展示了如何声明 bean 并禁用自动索引创建:AbstractConfigurableMongoDbMessageStoresetCreateIndexes(boolean)truespring-doc.cn

@Bean
public MongoDbChannelMessageStore mongoDbChannelMessageStore(MongoDatabaseFactory databaseFactory) {
    MongoDbChannelMessageStore mongoDbChannelMessageStore = new MongoDbChannelMessageStore(databaseFactory);
    mongoDbChannelMessageStore.setCreateIndexes(false);
    return mongoDbChannelMessageStore;
}

MongoDB 通道消息存储

版本 4.0 引入了新的 . 它针对实例使用进行了优化。 使用 ,您可以在实例中使用它来实现持久消息的优先级顺序轮询。 priority MongoDB document 字段从 () 消息标头填充。MongoDbChannelMessageStoreMessageGroupStoreQueueChannelpriorityEnabled = true<int:priority-queue>IntegrationMessageHeaderAccessor.PRIORITYpriorityspring-doc.cn

此外,所有 MongoDB 实例现在都有一个 documents 字段。 该值是同一集合中简单文档的操作结果,该集合是按需创建的。 该字段在操作中用于在消息存储在同一毫秒内时提供先进先出 (FIFO) 消息顺序(如果已配置,则在优先级内)。MessageStoresequenceMessageGroupsequence$incsequencesequencepollspring-doc.cn

我们不建议对优先级和非优先级使用相同的 bean,因为该选项适用于整个商店。 但是,相同的方法可以用于这两种类型,因为来自存储的消息轮询是排序的并使用索引。 要配置该场景,可以从一个消息存储 Bean 扩展另一个消息存储 Bean,如下例所示:MongoDbChannelMessageStorepriorityEnabledcollectionMongoDbChannelMessageStore
<bean id="channelStore" class="o.s.i.mongodb.store.MongoDbChannelMessageStore">
    <constructor-arg name="mongoDbFactory" ref="mongoDbFactory"/>
</bean>

<int:channel id="queueChannel">
    <int:queue message-store="store"/>
</int:channel>

<bean id="priorityStore" parent="channelStore">
    <property name="priorityEnabled" value="true"/>
</bean>

<int:channel id="priorityChannel">
    <int:priority-queue message-store="priorityStore"/>
</int:channel>

在禁用自动索引创建的情况下使用 AbstractConfigurableMongoDbMessageStore

从版本 6.0.8 开始,实现了 a,可用于 desable 或启用(默认)自动索引创建。 下面的示例展示了如何声明一个 bean 并禁用自动索引创建:AbstractConfigurableMongoDbMessageStoresetCreateIndex(boolean)spring-doc.cn

@Bean
public AbstractConfigurableMongoDbMessageStore mongoDbChannelMessageStore(MongoDatabaseFactory databaseFactory)
{
    AbstractConfigurableMongoDbMessageStore mongoDbChannelMessageStore = new MongoDbChannelMessageStore(databaseFactory);
    mongoDbChannelMessageStore.setCreateIndex(false);

    return mongoDbChannelMessageStore;
}

MongoDB 元数据存储

Spring Integration 4.2 引入了一个新的基于 MongoDB(参见元数据存储)的实现。 您可以使用 在应用程序重启后维护元数据状态。 您可以将此新实现与适配器一起使用,例如:MetadataStoreMongoDbMetadataStoreMetadataStorespring-doc.cn

要指示这些适配器使用新的 ,请声明一个 bean 名称为 的 Spring bean。 源入站通道适配器会自动选取并使用声明的 . 下面的示例展示了如何声明一个 name 为 :MongoDbMetadataStoremetadataStoreMongoDbMetadataStoremetadataStorespring-doc.cn

@Bean
public MetadataStore metadataStore(MongoDbFactory factory) {
    return new MongoDbMetadataStore(factory, "integrationMetadataStore");
}

它还实现了 ,使其在多个应用程序实例之间可靠地共享,其中只允许一个实例存储或修改键的值。 由于 MongoDB 的保证,所有这些操作都是原子的。MongoDbMetadataStoreConcurrentMetadataStorespring-doc.cn

它使用自定义实现将实例存储为 MongoDB 文档,并且 的属性(和值)有一些限制。MongoDbMessageStoreMappingMongoConverterMessagepayloadheaderMessage
我们不建议对优先级和非优先级使用相同的 bean,因为该选项适用于整个商店。 但是,相同的方法可以用于这两种类型,因为来自存储的消息轮询是排序的并使用索引。 要配置该场景,可以从一个消息存储 Bean 扩展另一个消息存储 Bean,如下例所示:MongoDbChannelMessageStorepriorityEnabledcollectionMongoDbChannelMessageStore

MongoDB 入站通道适配器

MongoDB 入站通道适配器是一个轮询使用者,它从 MongoDB 读取数据并将其作为有效负载发送。 以下示例显示如何配置 MongoDB 入站通道适配器:Messagespring-doc.cn

<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
       channel="replyChannel"
       query="{'name' : 'Bob'}"
       entity-class="java.lang.Object"
       auto-startup="false">
		<int:poller fixed-rate="100"/>
</int-mongodb:inbound-channel-adapter>

如前面的配置所示,您可以通过使用元素并为各种属性提供值来配置 MongoDb 入站通道适配器,例如:inbound-channel-adapterspring-doc.cn

  • query:JSON 查询(请参阅 MongoDB 查询)spring-doc.cn

  • query-expression:一个 SPEL 表达式,其计算结果为 JSON 查询字符串(如上述属性)或 . 与 属性互斥。queryo.s.data.mongodb.core.query.Queryqueryspring-doc.cn

  • entity-class:负载对象的类型。 如果未提供,则返回 a。com.mongodb.DBObjectspring-doc.cn

  • collection-name或:标识要使用的 MongoDB 集合的名称。collection-name-expressionspring-doc.cn

  • mongodb-factory:对o.s.data.mongodb.MongoDbFactoryspring-doc.cn

  • mongo-template:对o.s.data.mongodb.core.MongoTemplatespring-doc.cn

  • 所有其他入站适配器中通用的其他属性(例如 'channel')。spring-doc.cn

不能同时设置 和 。mongo-templatemongodb-factory

前面的示例相对简单且静态,因为它具有 the 的文字值,并使用 . 有时,您可能需要在运行时根据某些条件更改这些值。 为此,请使用它们的等效项( 和 ),其中提供的表达式可以是任何有效的 SPEL 表达式。querycollection-expressionquery-expressioncollection-name-expressionspring-doc.cn

此外,您可能希望对从 MongoDB 读取的成功处理数据进行一些后处理。 例如;您可能希望在处理完文档后移动或删除文档。 你可以通过使用 Spring Integration 2.2 添加的事务同步功能来实现这一点,如下例所示:spring-doc.cn

<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
    channel="replyChannel"
    query-expression="new BasicQuery('{''name'' : ''Bob''}').limit(100)"
    entity-class="java.lang.Object"
    auto-startup="false">
        <int:poller fixed-rate="200" max-messages-per-poll="1">
            <int:transactional synchronization-factory="syncFactory"/>
        </int:poller>
</int-mongodb:inbound-channel-adapter>

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-commit
        expression="@documentCleaner.remove(#mongoTemplate, payload, headers.mongo_collectionName)"
        channel="someChannel"/>
</int:transaction-synchronization-factory>

<bean id="documentCleaner" class="thing1.thing2.DocumentCleaner"/>

<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>

以下示例显示了前面示例中引用的内容:DocumentCleanerspring-doc.cn

public class DocumentCleaner {
    public void remove(MongoOperations mongoOperations, Object target, String collectionName) {
        if (target instanceof List<?>){
            List<?> documents = (List<?>) target;
            for (Object document : documents) {
                mongoOperations.remove(new BasicQuery(JSON.serialize(document)), collectionName);
            }
        }
    }
}

你可以使用 element 将你的 Poller 声明为事务性。 此元素可以引用真实的事务管理器(例如,如果流的某个其他部分调用 JDBC)。 如果您没有“真实”事务,则可以使用 的实例,它是 Spring 的实现,并且可以在没有实际事务时使用 Mongo 适配器的事务同步功能。transactionalo.s.i.transaction.PseudoTransactionManagerPlatformTransactionManagerspring-doc.cn

这样做不会使 MongoDB 本身成为事务性的。 它允许在成功 (提交) 之前或之后或失败 (回滚) 执行操作同步。

一旦你的 Poller 是事务性的,你就可以在元素上设置一个 the 的实例。 A 创建 . 为方便起见,我们公开了一个默认的基于 SPEL 的表达式,它允许您配置 SPEL 表达式,它们的执行与事务协调(同步)。 支持 before-commit、after-commit 和 after-rollback 事件的表达式,以及发送评估结果(如果有)的每个事件的通道。 对于每个子元素,您可以指定 and 属性。 如果仅存在该属性,则收到的消息将作为特定同步方案的一部分发送到该位置。 如果仅存在属性且表达式的结果是非空值,则会生成一条将结果作为有效载荷的消息,并将其发送到默认通道(),并显示在日志中(在级别上)。 如果您希望评估结果转到特定频道,请添加属性。 如果表达式的结果是 null 或 void,则不会生成任何消息。o.s.i.transaction.TransactionSynchronizationFactorytransactionalTransactionSynchronizationFactoryTransactionSynchronizationTransactionSynchronizationFactoryexpressionchannelchannelexpressionNullChannelDEBUGchannelspring-doc.cn

有关事务同步的更多信息,请参阅事务同步spring-doc.cn

从版本 5.5 开始,可以使用 ,该配置必须计算为使用 MongoDb 语法的 a 或实例。 它可以用作上述后处理程序的替代方案,并且它会修改从集合中获取的那些实体,因此它们不会在下一个轮询周期中再次从集合中拉出(假设更新更改了查询中使用的某些值)。 当集群中使用同一集合的多个实例时,仍然建议使用事务来实现执行隔离和数据一致性。MongoDbMessageSourceupdateExpressionStringupdateorg.springframework.data.mongodb.core.query.UpdateMongoDbMessageSourcespring-doc.cn

不能同时设置 和 。mongo-templatemongodb-factory
这样做不会使 MongoDB 本身成为事务性的。 它允许在成功 (提交) 之前或之后或失败 (回滚) 执行操作同步。

MongoDB Change Stream 入站通道适配器

从版本 5.3 开始,该模块引入了 - Spring Data API 的反应式实现。 默认情况下,此组件会生成一个消息,其中 a of 作为有效负载,并生成一些与更改流相关的标头(请参阅 )。 建议将其与 作为按需订阅和事件使用下游的 结合使用。spring-integration-mongodbMongoDbChangeStreamMessageProducerMessageProducerSupportReactiveMongoOperations.changeStream(String, ChangeStreamOptions, Class)FluxbodyChangeStreamEventMongoHeadersMongoDbChangeStreamMessageProducerFluxMessageChanneloutputChannelspring-doc.cn

此通道适配器的 Java DSL 配置可能如下所示:spring-doc.cn

@Bean
IntegrationFlow changeStreamFlow(ReactiveMongoOperations mongoTemplate) {
    return IntegrationFlow.from(
            MongoDb.changeStreamInboundChannelAdapter(mongoTemplate)
                    .domainType(Person.class)
                    .collection("person")
                    .extractBody(false))
            .channel(MessageChannels.flux())
            .get();
}

当 停止,或取消订阅,或 MongoDb 更改流生成 时,即完成。 可以再次启动通道适配器,并创建新的源数据,并在 中自动订阅该数据。 如果需要使用来自其他位置的更改流事件,则可以在两次启动之间为新选项重新配置此通道适配器。MongoDbChangeStreamMessageProducerOperationType.INVALIDATEPublisherPublisherMessageProducerSupport.subscribeToPublisher(Publisher<? extends Message<?>>)spring-doc.cn

请参阅 Spring Data MongoDb 文档中有关更改流支持的更多信息。spring-doc.cn

MongoDB 出站通道适配器

MongoDB 出站通道适配器允许您将消息有效负载写入 MongoDB 文档存储,如下例所示:spring-doc.cn

<int-mongodb:outbound-channel-adapter id="fullConfigWithCollectionExpression"
	collection-name="myCollection"
	mongo-converter="mongoConverter"
	mongodb-factory="mongoDbFactory" />

如前面的配置所示,您可以使用元素配置 MongoDB 出站通道适配器,为各种属性提供值,例如:outbound-channel-adapterspring-doc.cn

  • collection-name或:标识要使用的 MongoDb 集合的名称。collection-name-expressionspring-doc.cn

  • mongo-converter:对 实例的引用有助于将原始 Java 对象转换为 JSON 文档表示形式。o.s.data.mongodb.core.convert.MongoConverterspring-doc.cn

  • mongodb-factory:对 实例的引用。o.s.data.mongodb.MongoDbFactoryspring-doc.cn

  • mongo-template:对 实例的引用。 注意:您不能同时设置 mongo-template 和 mongodb-factory。o.s.data.mongodb.core.MongoTemplatespring-doc.cn

  • 所有入站适配器中通用的其他属性(例如 'channel')。spring-doc.cn

前面的示例相对简单且静态,因为它具有 . 有时,您可能需要在运行时根据某些条件更改此值。 为此,请使用 ,其中提供的表达式是任何有效的 SPEL 表达式。collection-namecollection-name-expressionspring-doc.cn

MongoDB 出站网关

版本 5.0 引入了 MongoDB 出站网关。 它允许您通过向数据库的请求通道发送消息来查询数据库。 然后,网关将响应发送到回复通道。 您可以使用消息有效负载和标头来指定查询和集合名称,如下例所示:spring-doc.cn

@SpringBootApplication
public class MongoDbJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MongoDbJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Autowired
    private MongoDbFactory;

    @Autowired
    private MongoConverter;


    @Bean
    public IntegrationFlow gatewaySingleQueryFlow() {
        return f -> f
                .handle(queryOutboundGateway())
                .channel(c -> c.queue("retrieveResults"));
    }

    private MongoDbOutboundGatewaySpec queryOutboundGateway() {
        return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
                .query("{name : 'Bob'}")
                .collectionNameFunction(m -> m.getHeaders().get("collection"))
                .expectSingleResult(true)
                .entityClass(Person.class);
    }

}
class MongoDbKotlinApplication {

    fun main(args: Array<String>) = runApplication<MongoDbKotlinApplication>(*args)

    @Autowired
    lateinit var mongoDbFactory: MongoDatabaseFactory;

    @Autowired
    lateinit var mongoConverter: MongoConverter;

    @Bean
    fun gatewaySingleQueryFlow() =
    integrationFlow {
        handle(queryOutboundGateway())
        channel { queue("retrieveResults") }
    }

    private fun queryOutboundGateway(): MongoDbOutboundGatewaySpec {
        return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
            .query("{name : 'Bob'}")
            .collectionNameFunction<Any> { m -> m.headers["collection"] as String }
            .expectSingleResult(true)
            .entityClass(Person::class.java)
    }

}
@SpringBootApplication
public class MongoDbJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MongoDbJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Autowired
    private MongoDbFactory mongoDbFactory;

    @Bean
    @ServiceActivator(inputChannel = "requestChannel")
    public MessageHandler mongoDbOutboundGateway() {
        MongoDbOutboundGateway gateway = new MongoDbOutboundGateway(this.mongoDbFactory);
        gateway.setCollectionNameExpressionString("'myCollection'");
        gateway.setQueryExpressionString("'{''name'' : ''Bob''}'");
        gateway.setExpectSingleResult(true);
        gateway.setEntityClass(Person.class);
        gateway.setOutputChannelName("replyChannel");
        return gateway;
    }

    @Bean
    @ServiceActivator(inputChannel = "replyChannel")
    public MessageHandler handler() {
        return message -> System.out.println(message.getPayload());
    }
}
<int-mongodb:outbound-gateway id="gatewayQuery"
    mongodb-factory="mongoDbFactory"
    mongo-converter="mongoConverter"
    query="{firstName: 'Bob'}"
    collection-name="myCollection"
    request-channel="in"
    reply-channel="out"
    entity-class="org.springframework.integration.mongodb.test.entity$Person"/>

您可以将以下属性用于 MongoDB 出站网关:spring-doc.cn

  • collection-name或:标识要使用的 MongoDB 集合的名称。collection-name-expressionspring-doc.cn

  • mongo-converter:对 实例的引用有助于将原始 Java 对象转换为 JSON 文档表示形式。o.s.data.mongodb.core.convert.MongoConverterspring-doc.cn

  • mongodb-factory:对 实例的引用。o.s.data.mongodb.MongoDbFactoryspring-doc.cn

  • mongo-template:对 实例的引用。 注意:您不能同时设置 和 。o.s.data.mongodb.core.MongoTemplatemongo-templatemongodb-factoryspring-doc.cn

  • entity-class:要传递给 MongoTemplate 中的 and 方法的实体类的完全限定名称。 如果未提供此属性,则默认值为 。find(..)findOne(..)org.bson.Documentspring-doc.cn

  • query或 :指定 MongoDB 查询。 有关更多查询示例,请参阅 MongoDB 文档query-expressionspring-doc.cn

  • collection-callback:对 实例的引用。 最好是 since 5.0.11 的实例,带有请求消息上下文。 有关更多信息,请参阅其 Javadocs。 注意:您不能同时拥有查询属性和任何查询属性。org.springframework.data.mongodb.core.CollectionCallbacko.s.i.mongodb.outbound.MessageCollectionCallbackcollection-callbackspring-doc.cn

作为 and 属性的替代方法,您可以通过使用该属性作为对功能接口实现的引用来指定其他数据库操作。 以下示例指定 count 操作:queryquery-expressioncollectionCallbackMessageCollectionCallbackspring-doc.cn

private MongoDbOutboundGatewaySpec collectionCallbackOutboundGateway() {
    return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
            .collectionCallback((collection, requestMessage) -> collection.count())
            .collectionName("myCollection");
}

MongoDB 反应式通道适配器

从版本 5.3 开始,提供了 and 实现。 它们基于 Spring Data 并需要依赖项。ReactiveMongoDbStoringMessageHandlerReactiveMongoDbMessageSourceReactiveMongoOperationsorg.mongodb:mongodb-driver-reactivestreamsspring-doc.cn

这是 的实现,当集成流定义中涉及反应流组合时,框架中原生支持 。 有关更多信息,请参见 ReactiveMessageHandlerReactiveMongoDbStoringMessageHandlerReactiveMessageHandlerspring-doc.cn

从配置的角度来看,它与许多其他标准通道适配器没有区别。 例如,对于 Java DSL,这样的通道适配器可以像这样使用:spring-doc.cn

@Bean
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
    return f -> f
            .channel(MessageChannels.flux())
            .handle(MongoDb.reactiveOutboundChannelAdapter(mongoDbFactory));
}

在此示例中,我们将通过提供的 MongoDb 连接到 MongoDb,并将请求消息中的数据存储到名称为 的默认集合中。 真正的操作将从内部创建的 .ReactiveMongoDatabaseFactorydataReactiveStreamsConsumerspring-doc.cn

这是基于提供的 or 和 MongoDb 查询(或表达式)的实现,根据具有预期类型的选项进行调用或操作,以转换查询结果。 当订阅生成的消息的有效负载中的 ( 或根据选项) 时,将按需执行查询执行和结果评估。 框架可以在 splitter 时自动(本质上)订阅这样的有效负载,并在下游使用。 否则,目标应用程序负责订阅下游终端节点中轮询的发布者。ReactiveMongoDbMessageSourceAbstractMessageSourceReactiveMongoDatabaseFactoryReactiveMongoOperationsfind()findOne()expectSingleResultentityClassPublisherFluxMonoexpectSingleResultflatMapFluxMessageChannelspring-doc.cn

使用 Java DSL 时,可以像这样配置这样的通道适配器:spring-doc.cn

@Bean
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
    return IntegrationFlow
            .from(MongoDb.reactiveInboundChannelAdapter(mongoDbFactory, "{'name' : 'Name'}")
                            .entityClass(Person.class),
                    c -> c.poller(Pollers.fixedDelay(1000)))
            .split()
            .channel(c -> c.flux("output"))
            .get();
}

从版本 5.5 开始,可以使用 配置。 它与阻止 . 有关更多信息,请参阅 MongoDB 入站通道适配器和 JavaDocs。ReactiveMongoDbMessageSourceupdateExpressionMongoDbMessageSourceAbstractMongoDbMessageSourceSpecspring-doc.cn