端点reactive()
从版本 5.5 开始,提供带有可选 customizer 的 configuration 属性。
此选项将目标端点配置为实例,独立于输入通道类型,该类型将转换为 via 。
运算符使用提供的函数来自定义 (, , 等) 来自 input 通道的反应流源。ConsumerEndpointSpec
reactive()
Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>
ReactiveStreamsConsumer
Flux
IntegrationReactiveUtils.messageChannelToFlux()
Flux.transform()
publishOn()
log()
doOnNext()
以下示例演示如何将发布线程从独立于最终订阅者和生成方的 input 通道更改为该 input 通道:DirectChannel
@Bean
public IntegrationFlow reactiveEndpointFlow() {
return IntegrationFlow
.from("inputChannel")
.transformWith(t -> t
.<String, Integer>transformer(Integer::parseInt)
.reactive(flux -> flux.publishOn(Schedulers.parallel()))
)
.get();
}
有关更多信息,请参阅 Reactive Streams Support 。