此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.4.3spring-doc.cadn.net.cn

reactive()端点

从版本 5.5 开始,ConsumerEndpointSpec提供reactive()configuration 属性与可选的定制器Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>. 此选项将目标终端节点配置为ReactiveStreamsConsumer实例,独立于输入通道类型,该类型转换为Flux通过IntegrationReactiveUtils.messageChannelToFlux(). 提供的函数从Flux.transform()运算符进行自定义 (publishOn(),log(),doOnNext()等)来自 input 通道的反应式流源。spring-doc.cadn.net.cn

下面的示例演示了如何将发布线程从独立于最终订阅者和生产者的 input 通道更改为该DirectChannel:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow reactiveEndpointFlow() {
    return IntegrationFlow
            .from("inputChannel")
            .transformWith(t -> t
                              .<String, Integer>transformer(Integer::parseInt)
                              .reactive(flux -> flux.publishOn(Schedulers.parallel()))
            )
            .get();
}

有关更多信息,请参阅 Reactive Streams Supportspring-doc.cadn.net.cn