从版本 5.5 开始,提供带有可选定制器的配置属性。 此选项将目标终端节点配置为实例,与输入通道类型无关,输入通道类型将转换为 via . 操作员使用提供的函数来自输入通道自定义(、、等)反应式流源。ConsumerEndpointSpecreactive()Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>ReactiveStreamsConsumerFluxIntegrationReactiveUtils.messageChannelToFlux()Flux.transform()publishOn()log()doOnNext()Spring中文文档

以下示例演示如何将发布线程从独立于最终订阅者和创建者的输入通道更改为该线程:DirectChannelSpring中文文档

@Bean
public IntegrationFlow reactiveEndpointFlow() {
    return IntegrationFlow
            .from("inputChannel")
            .transformWith(t -> t
                              .<String, Integer>transformer(Integer::parseInt)
                              .reactive(flux -> flux.publishOn(Schedulers.parallel()))
            )
            .get();
}

有关详细信息,请参阅响应式流支持Spring中文文档