端点reactive()

从版本 5.5 开始,提供带有可选 customizer 的 configuration 属性。 此选项将目标端点配置为实例,独立于输入通道类型,该类型将转换为 via 。 运算符使用提供的函数来自定义 (, , 等) 来自 input 通道的反应流源。ConsumerEndpointSpecreactive()Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>ReactiveStreamsConsumerFluxIntegrationReactiveUtils.messageChannelToFlux()Flux.transform()publishOn()log()doOnNext()spring-doc.cn

以下示例演示如何将发布线程从独立于最终订阅者和生成方的 input 通道更改为该 input 通道:DirectChannelspring-doc.cn

@Bean
public IntegrationFlow reactiveEndpointFlow() {
    return IntegrationFlow
            .from("inputChannel")
            .transformWith(t -> t
                              .<String, Integer>transformer(Integer::parseInt)
                              .reactive(flux -> flux.publishOn(Schedulers.parallel()))
            )
            .get();
}

有关更多信息,请参阅 Reactive Streams Supportspring-doc.cn