此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.4.3! |
这reactive()
端点
从版本 5.5 开始,ConsumerEndpointSpec
提供reactive()
configuration 属性与可选的定制器Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>
.
此选项将目标终端节点配置为ReactiveStreamsConsumer
实例,独立于输入通道类型,该类型转换为Flux
通过IntegrationReactiveUtils.messageChannelToFlux()
.
提供的函数从Flux.transform()
运算符进行自定义 (publishOn()
,log()
,doOnNext()
等)来自 input 通道的反应式流源。
下面的示例演示了如何将发布线程从独立于最终订阅者和生产者的 input 通道更改为该DirectChannel
:
@Bean
public IntegrationFlow reactiveEndpointFlow() {
return IntegrationFlow
.from("inputChannel")
.transformWith(t -> t
.<String, Integer>transformer(Integer::parseInt)
.reactive(flux -> flux.publishOn(Schedulers.parallel()))
)
.get();
}
有关更多信息,请参阅 Reactive Streams Support 。