响应式流支持
Reactive Streams 支持
Spring 集成在框架的某些地方和不同方面提供了对 Reactive Streams 交互的支持。 我们将在这里讨论其中的大部分内容,并在必要时提供指向目标章节的适当链接以获取详细信息。
前言
概括地说,Spring Integration 扩展了 Spring 编程模型以支持众所周知的企业集成模式。
Spring 集成在基于 Spring 的应用程序内支持轻量级消息传递,并支持通过声明式适配器与外部系统集成。
Spring 集成的主要目标是提供一个简单的模型来构建企业集成解决方案,同时保持关注点分离,这对于生成可维护、可测试的代码至关重要。
此目标是在目标应用程序中使用一等公民(如 和 )实现的,它允许我们构建集成流(管道),其中(在大多数情况下)一个端点将消息生成到通道中,供另一个端点使用。
通过这种方式,我们可以将集成交互模型与目标业务逻辑区分开来。
这里的关键部分是介于两者之间的通道:流行为取决于其实现,而端点保持不变。message
channel
endpoint
另一方面,Reactive Streams 是具有非阻塞背压的异步流处理标准。
Reactive Streams 的主要目标是管理跨异步边界的流数据交换——就像将元素传递给另一个线程或线程池一样——同时确保接收方不会被迫缓冲任意数量的数据。
换句话说,背压是此模型不可或缺的一部分,以便允许在线程之间调解的队列有界。
Reactive Streams 实现(例如 Project Reactor)的目的是在 Stream 应用程序的整个处理图中保留这些优势和特征。
Reactive Streams 库的最终目标是使用可用的编程语言结构,以透明和流畅的方式为目标应用程序提供类型、运算符集和支持 API,但最终解决方案并不像普通函数链调用那样势在必行。
它分为几个阶段:定义和执行,这发生在订阅最终反应式发布者的一段时间后,对数据的需求从定义的底部推送到顶部,根据需要施加背压 - 我们请求当前可以处理的尽可能多的事件。
响应式应用程序看起来像我们在 Spring Integration 术语中习惯的 or - 。
事实上,自 Java 9 以来的 Reactive Streams SPI 就在类中介绍。"stream"
"flow"
java.util.concurrent.Flow
从这里来看,当我们在端点上应用一些反应式框架运算符时,Spring 集成流似乎真的非常适合编写反应式流应用程序,但实际上问题要广泛得多,我们需要记住,并非所有端点(例如)都可以在反应流中透明地处理。
当然,Spring Integration 中 Reactive Streams 支持的主要目标是允许整个过程完全反应式、按需启动和背压就绪。
在通道适配器的目标协议和系统提供 Reactive Streams 交互模型之前,这是不可能的。
在下面的部分中,我们将描述 Spring Integration 中提供了哪些组件和方法,用于开发保留集成流结构的响应式应用程序。JdbcMessageHandler
Spring 集成中的所有 Reactive Streams 交互都是使用 Project Reactor 类型实现的,例如 和 。Mono Flux |
消息网关
与 Reactive Streams 交互的最简单点是,我们只需将网关方法的返回类型设为 - 当返回的实例发生订阅时,网关方法调用背后的整个集成流将执行。
有关更多信息,请参阅 Reactor Mono
。
在框架内部,对于完全基于 Reactive Streams 兼容协议的入站网关,使用了类似的 -reply 方法(有关更多信息,请参阅下面的 Reactive Channel Adapters)。
send-and-receive 操作被包装成一个 with-链接一个 Headers 的回复评估。
这样,特定反应式协议(例如 Netty)的入站组件将作为在 Spring 集成上执行的反应式流的订阅者和发起者。
如果请求有效负载是反应式类型,则最好使用反应式流定义来处理它,从而将进程推迟到发起方订阅。
为此,处理程序方法也必须返回响应式类型。
有关更多信息,请参阅下一节。@MessagingGateway
Mono<?>
Mono
Mono
Mono.deffer()
replyChannel
反应式回复有效负载
当生成回复消息的回复返回反应式类型有效负载时,它将以异步方式处理,并为 提供常规实现,并在输出通道为实现时通过按需订阅扁平化,例如 .
使用标准的命令式用例,如果回复有效负载是多值发布者(有关更多信息),则会将其包装到 .
因此,必须显式订阅下游或由下游扁平化。
使用 a for the ,无需担心返回类型和订阅;框架内部一切都顺利处理。MessageHandler
MessageChannel
outputChannel
ReactiveStreamsSubscribableChannel
FluxMessageChannel
MessageChannel
ReactiveAdapter.isMultiValue()
Mono.just()
Mono
FluxMessageChannel
ReactiveStreamsSubscribableChannel
outputChannel
有关更多信息,请参阅 Asynchronous Service Activator 。
如需了解详情,另请参阅 Kotlin 协程。
FluxMessageChannel
和ReactiveStreamsConsumer
这是 和 的组合实现。
作为热源,在内部创建,用于从实现中接收传入消息。
该实现被委托给该内部 .
此外,对于按需上游消费,它为 Contract 提供了一个实现。
当此通道的订阅准备就绪时,为此通道提供的任何上游(例如,请参阅下面的 Source Polling Channel Adapter 和 splitter)都会自动订阅。
来自此委托发布者的事件将沉入上述内部。FluxMessageChannel
MessageChannel
Publisher<Message<?>>
Flux
send()
Publisher.subscribe()
Flux
FluxMessageChannel
ReactiveStreamsSubscribableChannel
Publisher
Flux
的 consumer 必须是遵守 Reactive Streams Contract 的实例。
幸运的是,Spring Integration 中的所有实现也都实现了一个 from 项目 Reactor。
由于中间的实现,整个集成流程配置对目标开发人员是透明的。
在这种情况行为从命令式推送模型更改为反应式拉取模型。
A 还可用于使用 将 any 转换为反应式源,使集成流部分响应式。FluxMessageChannel
org.reactivestreams.Subscriber
MessageHandler
CoreSubscriber
ReactiveStreamsConsumer
ReactiveStreamsConsumer
MessageChannel
IntegrationReactiveUtils
有关更多信息,请参阅 FluxMessageChannel
。
从版本 5.5 开始,引入了一个选项,用于将流中的终端节点设置为独立于输入通道。
可以选择通过操作自定义来自输入通道的源,例如使用 、 等。
此功能通过其属性表示为所有消息传递注释(等)的子注释。ConsumerEndpointSpec
reactive()
ReactiveStreamsConsumer
Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>
Flux
Flux.transform()
publishOn()
doOnNext()
retry()
@Reactive
@ServiceActivator
@Splitter
reactive()
源轮询通道适配器
通常,依赖于由 启动的任务。
轮询触发器是根据提供的选项构建的,用于定期计划任务以轮询目标数据或事件源。
当 an 是 a 时,相同的用于确定下次执行的时间,但不是计划任务,而是根据上一步中的 for 值和 for 持续时间创建一个。
然后,使用 A 轮询并将它们沉入 output 中。
此生成器由提供的 subscribe,以执行 back-pressure downstream 。
从版本 5.5 开始,当 时,根本不会调用源,而是立即通过 result 完成,直到稍后将 更改为非零值,例如通过控制总线。
这样,任何 implementation 都可以变成反应式热源。SourcePollingChannelAdapter
TaskScheduler
outputChannel
ReactiveStreamsSubscribableChannel
Trigger
SourcePollingChannelAdapter
Flux<Message<?>>
Flux.generate()
nextExecutionTime
Mono.delay()
Flux.flatMapMany()
maxMessagesPerPoll
Flux
Flux
ReactiveStreamsSubscribableChannel
maxMessagesPerPoll == 0
flatMapMany()
Mono.empty()
maxMessagesPerPoll
MessageSource
有关更多信息,请参阅轮询使用者。
事件驱动的通道适配器
MessageProducerSupport
是事件驱动型通道适配器的基类,通常,它用作生成驱动程序 API 中的侦听器回调。
当消息生成者实现构建消息而不是基于侦听器的功能时,此回调也可以很容易地插入到 Reactor 运算符中。
实际上,当消息生成方的 不是 时,这是在框架中完成的。
但是,为了改善最终用户体验,并允许更多背压就绪功能,当 是来自目标系统的数据源时,它提供了一个 API,可在 Target 实施中使用。
通常,当为源数据调用 Target Driver API 时,从实现中使用它。
建议将反应式实现与 a 结合使用,作为按需订阅和事件使用下游。
取消对 的订阅时,通道适配器将进入 Stopped 状态。
调用此类通道适配器将完成从 source 的生成。
可以通过自动订阅新创建的源来重新启动通道适配器。sendMessage(Message<?>)
doOnNext()
Flux
outputChannel
ReactiveStreamsSubscribableChannel
MessageProducerSupport
subscribeToPublisher(Publisher<? extends Message<?>>)
Publisher<Message<?>>>
doStart()
Publisher
MessageProducerSupport
FluxMessageChannel
outputChannel
Publisher
stop()
Publisher
Publisher
Message Source 到 Reactive Streams
从版本 5.3 开始,提供了 a。
它是提供的和事件驱动的生产组合到配置的 .
在内部,它将 a 包装到重复重新订阅的 a 中,生成 a to be subscribed,如上所述。
为此,订阅是为了避免在目标中可能被阻止。
当消息源返回(没有要拉取的数据)时,将变为带有 a 的状态,以便根据订阅者上下文中的条目进行后续重新订阅。
默认情况下,它是 1 秒。
如果生成的消息的标头中包含信息,则在原始消息中确认(如有必要),如果下游流抛出 a 并拒绝失败的消息,则会在原始消息中拒绝该消息。
这可用于任何用例,当 polling 通道适配器的功能应该转换为任何现有实现的反应式按需解决方案时。ReactiveMessageSourceProducer
MessageSource
outputChannel
MessageSource
Mono
Flux<Message<?>>
subscribeToPublisher(Publisher<? extends Message<?>>)
Mono
Schedulers.boundedElastic()
MessageSource
null
Mono
repeatWhenEmpty()
delay
IntegrationReactiveUtils.DELAY_WHEN_EMPTY_KEY
Duration
MessageSource
IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
doOnSuccess()
Mono
doOnError()
MessagingException
ReactiveMessageSourceProducer
MessageSource<?>
拆分器和聚合器
当 an 获取 a for 其逻辑时,该过程自然会遍历 中的项目,以将它们映射到消息中,以便发送到 。
如果此通道是 ,则从该通道按需订阅 的包装器 ,当我们将传入事件映射到 multi-value output 时,此拆分器行为看起来更像 Reactor 运算符。
当整个集成流是使用 splitter 之前和之后构建的时,它最有意义,将 Spring 集成配置与 Reactive Streams 需求及其事件处理运算符保持一致。
使用常规通道,a 被转换为 an 用于标准的 Iterate-and-produce 拆分逻辑。AbstractMessageSplitter
Publisher
Publisher
outputChannel
ReactiveStreamsSubscribableChannel
Flux
Publisher
flatMap
Publisher
FluxMessageChannel
Publisher
Iterable
A 是另一个特定的 Reactive Streams 逻辑实现示例,可以将其视为 Project Reactor。
它基于 和 (或 ) 运算符。
传入消息在创建 a 时被沉入 initiated 中,使其成为热源。
这可以通过按需订阅,或者直接在 is not reactive 时订阅。
当整个 integration flow 使用 before and after this component 构建时,这有它的力量,使整个 logic back-pressure 准备就绪。FluxAggregatorMessageHandler
"reactive operator"
Flux.groupBy()
Flux.window()
buffer()
Flux.create()
FluxAggregatorMessageHandler
Flux
ReactiveStreamsSubscribableChannel
FluxAggregatorMessageHandler.start()
outputChannel
MessageHandler
FluxMessageChannel
有关更多信息,请参阅 Stream and Flux Splitting 和 Flux Aggregator 。
Java DSL
Java 中的 DSL 可以从任何实例开始(参见)。
此外,通过操作员,可以将其变成反应热源。
在这两种情况下,A 都在内部使用;它可以根据其 Contract 订阅入站,并且它本身就是下游订阅者的 一个。
通过动态注册,我们可以实现一个强大的逻辑,将 Reactive Streams 与此集成流相结合,桥接到 / from 。IntegrationFlow
Publisher
IntegrationFlow.from(Publisher<Message<T>>)
IntegrationFlowBuilder.toReactivePublisher()
IntegrationFlow
FluxMessageChannel
Publisher
ReactiveStreamsSubscribableChannel
Publisher<Message<?>>
IntegrationFlow
Publisher
从版本 5.5.6 开始,存在一个 operator 变体来控制返回的 whole 后面的生命周期。
通常,来自反应式发布者的订阅和使用发生在后面的运行时阶段,而不是在反应式流组合期间,甚至在启动期间。
为了避免在订阅点的生命周期管理中使用样板代码,并获得更好的最终用户体验,引入了这个带有标志的新运算符。
它将 标记 (if ) 及其组件,因此不会自动启动流中消息的生成和使用。
相反,for 是从内部 .
与值无关,流从 和 停止 - 如果没有可以使用消息,则生成消息没有意义。toReactivePublisher(boolean autoStartOnSubscribe)
IntegrationFlow
Publisher<Message<?>>
ApplicationContext
IntegrationFlow
Publisher<Message<?>>
autoStartOnSubscribe
true
IntegrationFlow
autoStartup = false
ApplicationContext
start()
IntegrationFlow
Flux.doOnSubscribe()
autoStartOnSubscribe
Flux.doOnCancel()
Flux.doOnTerminate()
对于完全相反的用例,when 应该调用反应式流并在完成后继续,在 .
此时的流将转换为 a,该流将传播到在 operator 中执行的 provided 。
该函数的结果被包装到 for flat-mapping 到一个输出中,该输出由另一个 for downstream flow 订阅。IntegrationFlow
fluxTransform()
IntegrationFlowDefinition
FluxMessageChannel
fluxFunction
Flux.transform()
Mono<Message<?>>
Flux
FluxMessageChannel
有关更多信息,请参见 Java DSL 章节。
ReactiveMessageHandler
从版本 5.3 开始,框架原生支持 。
这种类型的消息处理程序专为反应式客户端而设计,这些客户端返回反应式类型以进行按需订阅以执行低级操作,并且不提供任何回复数据来继续反应式流组合。
当 a 在命令式集成流中使用时,结果 in 在 return 后立即被订阅,只是因为在这样的流中没有反应式流组合来遵守背压。
在这种情况下,框架将其包装成 - .
然而,当 a 参与流时(例如,当 channel to consume 是 a 时),这样的 a 被组成到整个反应流中,并使用 Reactor 算子来遵守 consumption 期间的背压。ReactiveMessageHandler
ReactiveMessageHandler
handleMessage()
ReactiveMessageHandler
ReactiveMessageHandlerAdapter
MessageHandler
ReactiveStreamsConsumer
FluxMessageChannel
ReactiveMessageHandler
flatMap()
开箱即用的实现之一是 for Outbound Channel Adapter。
有关更多信息,请参见MongoDB Reactive Channel Adapters。ReactiveMessageHandler
ReactiveMongoDbStoringMessageHandler
Reactive 通道适配器
当集成的目标协议提供 Reactive Streams 解决方案时,在 Spring Integration 中实现通道适配器就变得简单了。
入站、事件驱动的通道适配器实现是关于将请求(如有必要)包装到延迟的 or 中,并且仅在协议组件将订阅启动到从 listener 方法返回的 return 中时执行发送(并生成回复,如果有)。
这样我们就有了一个完全封装在这个组件中的反应式流解决方案。
当然,在输出通道上订阅的下游集成流应该遵循 Reactive Streams 规范,并以按需、背压就绪的方式执行。Mono
Flux
Mono
根据集成流程中使用的处理器的性质 (或当前实现) ,这并不总是可用的。
当没有反应式实现时,可以使用线程池和队列或(见上文)在集成端点之前和之后处理此限制。MessageHandler
FluxMessageChannel
反应式事件驱动的入站通道适配器的示例:
public class CustomReactiveMessageProducer extends MessageProducerSupport {
private final CustomReactiveSource customReactiveSource;
public CustomReactiveMessageProducer(CustomReactiveSource customReactiveSource) {
Assert.notNull(customReactiveSource, "'customReactiveSource' must not be null");
this.customReactiveSource = customReactiveSource;
}
@Override
protected void doStart() {
Flux<Message<?>> messageFlux =
this.customReactiveSource
.map(event - >
MessageBuilder
.withPayload(event.getBody())
.setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
.build());
subscribeToPublisher(messageFlux);
}
}
用法将如下所示:
public class MainFlow {
@Autowired
private CustomReactiveMessageProducer customReactiveMessageProducer;
@Bean
public IntegrationFlow buildFlow() {
return IntegrationFlow.from(customReactiveMessageProducer)
.channel(outputChannel)
.get();
}
}
或者以声明的方式:
public class MainFlow {
@Bean
public IntegrationFlow buildFlow() {
return IntegrationFlow.from(new CustomReactiveMessageProducer(new CustomReactiveSource()))
.handle(outputChannel)
.get();
}
}
或者即使没有通道适配器,我们也可以始终按以下方式使用 Java DSL:
public class MainFlow {
@Bean
public IntegrationFlow buildFlow() {
Flux<Message<?>> myFlux = this.customReactiveSource
.map(event ->
MessageBuilder
.withPayload(event.getBody())
.setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
.build());
return IntegrationFlow.from(myFlux)
.handle(outputChannel)
.get();
}
}
反应式出站通道适配器实现是关于根据为目标协议提供的反应式 API 启动(或延续)反应流以与外部系统交互。 入站有效负载本身可以是反应式类型,也可以是整个集成流的事件,它是顶部的反应式流的一部分。 如果我们处于单向、即发即弃的场景中,则可以立即订阅返回的反应式类型,或者它被传播到下游(请求-回复场景)以进行进一步的集成流或目标业务逻辑中的显式订阅,但仍在下游保留反应式流语义。
反应式出站通道适配器的示例:
public class CustomReactiveMessageHandler extends AbstractReactiveMessageHandler {
private final CustomEntityOperations customEntityOperations;
public CustomReactiveMessageHandler(CustomEntityOperations customEntityOperations) {
Assert.notNull(customEntityOperations, "'customEntityOperations' must not be null");
this.customEntityOperations = customEntityOperations;
}
@Override
protected Mono<Void> handleMessageInternal(Message<?> message) {
return Mono.fromSupplier(() -> message.getHeaders().get("queryType", Type.class))
.flatMap(mode -> {
switch (mode) {
case INSERT:
return handleInsert(message);
case UPDATE:
return handleUpdate(message);
default:
return Mono.error(new IllegalArgumentException());
}
}).then();
}
private Mono<Void> handleInsert(Message<?> message) {
return this.customEntityOperations.insert(message.getPayload())
.then();
}
private Mono<Void> handleUpdate(Message<?> message) {
return this.r2dbcEntityOperations.update(message.getPayload())
.then();
}
public enum Type {
INSERT,
UPDATE,
}
}
我们将能够使用这两个通道适配器:
public class MainFlow {
@Autowired
private CustomReactiveMessageProducer customReactiveMessageProducer;
@Autowired
private CustomReactiveMessageHandler customReactiveMessageHandler;
@Bean
public IntegrationFlow buildFlow() {
return IntegrationFlow.from(customReactiveMessageProducer)
.transform(someOperation)
.handle(customReactiveMessageHandler)
.get();
}
}
目前, Spring 集成为 WebFlux、RSocket、MongoDb、R2DBC、ZeroMQ、GraphQL、Apache Cassandra 提供通道适配器(或网关)实现。
Redis Stream Channel Adapters 也是反应式的,并使用 Spring Data 中的适配器。
更多的反应式通道适配器即将到来,例如基于 Spring 的 Kafka 中的 Apache Kafka,以及来自 Apache Kafka 的 API 等。
对于许多其他非反应式通道适配器,建议使用线程池以避免在反应式流处理期间阻塞。ReactiveStreamOperations
ReactiveKafkaProducerTemplate
ReactiveKafkaConsumerTemplate
对命令式上下文传播的反应
当 Context Propagation 库位于 Classpath 上时,Project Reactor 可以获取值(例如 Micrometer Observation 或 )并将其存储到上下文中。
当我们需要填充日志记录 MDC 进行跟踪或让我们从反应流中调用的服务来从作用域中恢复观察时,也可以进行相反的操作。
请参阅 Project Reactor 文档中有关其用于上下文传播的特殊运算符的更多信息。
如果我们的整个解决方案是单个反应式流组合,则存储和恢复上下文可以顺利进行,因为上下文从下游一直到组合的开头都是可见的( or )。
但是,如果应用程序在不同实例之间切换或切换到命令式处理并返回,则与 关联的上下文可能不可用。
对于这样的用例, Spring 集成提供了一个额外的功能(从 version 开始),将 Reactor 存储到从反应流生成的消息头中,例如,当我们执行直接操作时。
然后,使用此标头来恢复此通道将要发出的 Reactor 上下文。
目前,此标头是从 和 组件填充的,但可用于执行反应式到命令式集成的任何解决方案。
填充此标头的逻辑如下所示:ThreadLocal
SecurityContextHolder
Subscriber
Subscriber
Flux
Mono
Flux
Subscriber
6.0.5
ContextView
IntegrationMessageHeaderAccessor.REACTOR_CONTEXT
send()
FluxMessageChannel.subscribeTo()
Message
WebFluxInboundEndpoint
RSocketInboundGateway
return requestMono
.flatMap((message) ->
Mono.deferContextual((context) ->
Mono.just(message)
.handle((messageToSend, sink) ->
send(messageWithReactorContextIfAny(messageToSend, context)))));
...
private Message<?> messageWithReactorContextIfAny(Message<?> message, ContextView context) {
if (!context.isEmpty()) {
return getMessageBuilderFactory()
.fromMessage(message)
.setHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT, context)
.build();
}
return message;
}
请注意,我们仍然需要使用 operator 使 Reactor 从上下文中恢复值。
即使它是作为 Headers 发送的,框架也无法假设它是否要恢复到下游的值。handle()
ThreadLocal
ThreadLocal
要从 on the other 或 composition 恢复上下文,可以执行以下逻辑:Message
Flux
Mono
Mono.just(message)
.handle((messageToHandle, sink) -> ...)
.contextWrite(StaticMessageHeaderAccessor.getReactorContext(message)));