此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.3.1Spring中文文档

此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.3.1Spring中文文档

RSocket Spring 集成模块 () 允许执行 RSocket 应用程序协议spring-integration-rsocketSpring中文文档

您需要将此依赖项包含在项目中:Spring中文文档

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-rsocket</artifactId>
    <version>6.3.2-SNAPSHOT</version>
</dependency>
compile "org.springframework.integration:spring-integration-rsocket:6.3.2-SNAPSHOT"

此模块从版本 5.2 开始可用,并且基于 Spring Messaging 基础及其 RSocket 组件实现,例如 和 。 有关 RSocket 协议、术语和组件的更多信息,请参阅 Spring Framework RSocket 支持RSocketRequesterRSocketMessageHandlerRSocketStrategiesSpring中文文档

在通过通道适配器开始集成流处理之前,我们需要在服务器和客户端之间建立 RSocket 连接。 为此,Spring Integration RSocket 支持提供了 .ServerRSocketConnectorClientRSocketConnectorAbstractRSocketConnectorSpring中文文档

根据提供的用于接受来自客户端的连接,在主机和端口上公开侦听器。 可以使用 自定义内部实例,也可以使用其他可以配置的选项进行自定义,例如 以及有效负载数据和标头元数据。 当客户端请求者提供 a 时(见下文),连接的客户端将存储为 a 在由 . 默认情况下,连接数据用作键的转换值,转换为具有 UTF-8 字符集的字符串。 此类注册表可以在应用程序逻辑中用于确定与该客户端交互的特定客户端连接,或将相同的消息发布到所有连接的客户端。 从客户端建立连接时,将从 . 这类似于Spring Messaging模块中的注释所提供的内容。 映射模式表示接受所有客户端路由。 可用于通过标头区分不同的路由。ServerRSocketConnectorio.rsocket.transport.ServerTransportRSocketServersetServerConfigurer()RSocketStrategiesMimeTypesetupRouteClientRSocketConnectorRSocketRequesterclientRSocketKeyStrategyBiFunction<Map<String, Object>, DataBuffer, Object>RSocketRequesterRSocketConnectedEventServerRSocketConnector@ConnectMapping*RSocketConnectedEventDestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADERSpring中文文档

典型的服务器配置可能如下所示:Spring中文文档

@Bean
public RSocketStrategies rsocketStrategies() {
    return RSocketStrategies.builder()
        .decoder(StringDecoder.textPlainOnly())
        .encoder(CharSequenceEncoder.allMimeTypes())
        .dataBufferFactory(new DefaultDataBufferFactory(true))
        .build();
}

@Bean
public ServerRSocketConnector serverRSocketConnector() {
    ServerRSocketConnector serverRSocketConnector = new ServerRSocketConnector("localhost", 0);
    serverRSocketConnector.setRSocketStrategies(rsocketStrategies());
    serverRSocketConnector.setMetadataMimeType(new MimeType("message", "x.rsocket.routing.v0"));
    serverRSocketConnector.setServerConfigurer((server) -> server.payloadDecoder(PayloadDecoder.ZERO_COPY));
    serverRSocketConnector.setClientRSocketKeyStrategy((headers, data) -> ""
                                    + headers.get(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER));
    return serverRSocketConnector;
}

@EventListener
public void onApplicationEvent(RSocketConnectedEvent event) {
	...
}

所有选项(包括 bean 和 for )都是可选的。 有关更多信息,请参阅 JavaDocs。RSocketStrategies@EventListenerRSocketConnectedEventServerRSocketConnectorSpring中文文档

从版本 5.2.1 开始,将提取到公共顶级类中,以便与现有 RSocket 服务器建立可能的连接。 当 提供 的外部实例时,它不会在内部创建 RSocket 服务器,而只是将所有处理逻辑委托给提供的实例。 此外,还可以为 RSocket 控制器配置一个标志来处理,完全取代标准提供的功能。 这在混合配置中很有用,因为经典方法与 RSocket 通道适配器一起存在于同一应用程序中,并且应用程序中存在外部配置的 RSocket 服务器。ServerRSocketMessageHandlerServerRSocketConnectorServerRSocketMessageHandlerServerRSocketMessageHandlermessageMappingCompatible@MessageMappingRSocketMessageHandler@MessageMappingSpring中文文档

用作基于通过提供的连接的支架。 可以使用提供的 . (带有可选模板变量)和元数据也可以在此组件上配置。ClientRSocketConnectorRSocketRequesterRSocketClientTransportRSocketConnectorRSocketConnectorConfigurersetupRoutesetupDataSpring中文文档

典型的客户端配置可能如下所示:Spring中文文档

@Bean
public RSocketStrategies rsocketStrategies() {
    return RSocketStrategies.builder()
        .decoder(StringDecoder.textPlainOnly())
        .encoder(CharSequenceEncoder.allMimeTypes())
        .dataBufferFactory(new DefaultDataBufferFactory(true))
        .build();
}

@Bean
public ClientRSocketConnector clientRSocketConnector() {
    ClientRSocketConnector clientRSocketConnector =
            new ClientRSocketConnector("localhost", serverRSocketConnector().getBoundPort().block());
    clientRSocketConnector.setRSocketStrategies(rsocketStrategies());
    clientRSocketConnector.setSetupRoute("clientConnect/{user}");
    clientRSocketConnector.setSetupRouteVariables("myUser");
    return clientRSocketConnector;
}

这些选项中的大多数(包括 bean)都是可选的。 请注意我们如何连接到任意端口上本地启动的 RSocket 服务器。 请参阅用例。 有关更多信息,另请参阅及其超类 JavaDocs。RSocketStrategiesServerRSocketConnector.clientRSocketKeyStrategysetupDataClientRSocketConnectorAbstractRSocketConnectorSpring中文文档

两者都负责将入站通道适配器映射到其配置,以路由传入的 RSocket 请求。 有关详细信息,请参阅下一节。ClientRSocketConnectorServerRSocketConnectorpathSpring中文文档

RSocket 入站网关

负责接收 RSocket 请求并生成响应(如果有)。 它需要一个映射数组,这些映射可以是类似于 MVC 请求映射或语义的模式。 此外,(从版本 5.2.2 开始),可以在 上配置一组交互模型(请参阅 ),以按特定帧类型限制对此端点的 RSocket 请求。 默认情况下,支持所有交互模型。 这样的 Bean 根据其实现(扩展 的 ),由 或 自动检测 对于传入请求的内部路由逻辑。 可以向显式终结点注册提供 an。 这样,自动检测选项将被禁用。 也可以注入到或从提供的覆盖任何显式注入中获得。 解码器用于根据提供的解码器对请求负载进行解码。 如果在传入的 中未提供标头,则将请求视为 RSocket 交互模型。 在本例中,an 对 . 否则,标头中的值用于向 RSocket 发送回复。 为此,an 对 . 根据逻辑,要向下游发送的消息始终是 a。 在 RSocket 交互模型中,消息具有普通转换的 . 回复可以是纯对象,也可以是 - 根据 中提供的编码器,将它们正确地转换为 RSocket 响应。RSocketInboundGatewaypath@MessageMappingRSocketInteractionModelRSocketInboundGatewayIntegrationRSocketEndpointReactiveMessageHandlerServerRSocketConnectorClientRSocketConnectorIntegrationRSocketMessageHandlerAbstractRSocketConnectorRSocketInboundGatewayAbstractRSocketConnectorRSocketStrategiesRSocketInboundGatewayAbstractRSocketConnectorRSocketStrategiesrequestElementTypeRSocketPayloadReturnValueHandler.RESPONSE_HEADERMessageRSocketInboundGatewayfireAndForgetRSocketInboundGatewaysendoutputChannelMonoProcessorRSocketPayloadReturnValueHandler.RESPONSE_HEADERRSocketInboundGatewaysendAndReceiveMessageReactiveoutputChannelpayloadFluxMessagingRSocketfireAndForgetpayloadpayloadPublisherRSocketInboundGatewayRSocketStrategiesSpring中文文档

从版本 5.3 开始,将向 . 默认情况下,传入的转换方式是单独解码每个事件。 这是目前语义中存在的确切行为。 要根据应用程序要求恢复以前的行为或将整体解码为单个单元,必须设置为 。 但是,目标解码逻辑取决于所选内容,例如,a 要求流中存在一个新的行分隔符(默认情况下)以指示字节缓冲区结束。decodeFluxAsUnitfalseRSocketInboundGatewayFlux@MessageMappingFluxdecodeFluxAsUnittrueDecoderStringDecoderSpring中文文档

有关如何配置端点和处理下游有效负载的示例,请参阅使用 Java 配置 RSocket 端点RSocketInboundGatewaySpring中文文档

RSocket 出站网关

用于执行对 RSocket 的请求,并根据 RSocket 回复(如果有)生成回复。 低级别 RSocket 协议交互从服务器端请求消息中提供的或标头中解析。 服务器端的目标可以通过 或使用 API 根据为连接请求映射选择的某个业务键进行解析。 有关更多信息,请参阅 JavaDocs。RSocketOutboundGatewayAbstractReplyProducingMessageHandlerRSocketRequesterClientRSocketConnectorRSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADERRSocketRequesterRSocketConnectedEventServerRSocketConnector.getClientRSocketRequester()ServerRSocketConnector.setClientRSocketKeyStrategy()ServerRSocketConnectorSpring中文文档

要发送的请求必须显式配置(与路径变量一起)或通过根据请求消息进行评估的 SpEL 表达式。routeSpring中文文档

RSocket 交互模型可以通过选项或相应的表达式设置提供。 默认情况下,a 用于常见网关用例。RSocketInteractionModelrequestResponseSpring中文文档

当请求消息有效负载为 时,可以提供一个选项来根据目标中提供的元素对其进行编码。 此选项的表达式的计算结果为 。 有关数据及其类型的更多信息,请参阅 JavaDocs。PublisherpublisherElementTypeRSocketStrategiesRSocketRequesterParameterizedTypeReferenceRSocketRequester.RequestSpec.data()Spring中文文档

RSocket 请求也可以使用 . 为此,可以在 上配置针对请求的消息。 这样的表达式的计算结果必须为 .metadatametadataExpressionRSocketOutboundGatewayMap<Object, MimeType>Spring中文文档

当不是时,必须提供。 默认情况下,这是一个。 此选项的表达式的计算结果为 。 有关回复数据及其类型的更多信息,请参阅 和 JavaDocs。interactionModelfireAndForgetexpectedResponseTypeString.classParameterizedTypeReferenceRSocketRequester.RetrieveSpec.retrieveMono()RSocketRequester.RetrieveSpec.retrieveFlux()Spring中文文档

来自 的回复是一个(即使对于交互模型也是如此),总是使这个组件成为 。 这样的一个在生产到常规渠道之前被订阅,或者由. or 交互模型的响应也包装在回复中。 它可以通过直通服务激活器在下游展平:payloadRSocketOutboundGatewayMonofireAndForgetMono<Void>asyncMonooutputChannelFluxMessageChannelFluxrequestStreamrequestChannelMonoFluxMessageChannelSpring中文文档

@ServiceActivator(inputChannel = "rsocketReplyChannel", outputChannel ="fluxMessageChannel")
public Flux<?> flattenRSocketResponse(Flux<?> payload) {
    return payload;
}

或在目标应用程序逻辑中显式订阅。Spring中文文档

还可以配置(或通过表达式计算)预期的响应类型,以将此网关视为出站通道适配器。 但是,必须配置 still(即使它只是一个 )才能启动对返回的 .voidoutputChannelNullChannelMonoSpring中文文档

请参阅使用 Java 配置 RSocket 端点,了解如何配置端点与下游有效负载的交易示例。RSocketOutboundGatewaySpring中文文档

RSocket 命名空间支持

Spring Integration 提供了命名空间和相应的模式定义。 若要将其包含在配置中,请在应用程序上下文配置文件中添加以下命名空间声明:rsocketSpring中文文档

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-rsocket="http://www.springframework.org/schema/integration/rsocket"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/rsocket
    https://www.springframework.org/schema/integration/rsocket/spring-integration-rsocket.xsd">
    ...
</beans>

入境

要使用 XML 配置 Spring Integration RSocket 入站通道适配器,您需要使用命名空间中的相应组件。 以下示例演示如何配置它:inbound-gatewayint-rsocketSpring中文文档

<int-rsocket:inbound-gateway id="inboundGateway"
                             path="testPath"
                             interaction-models="requestStream,requestChannel"
                             rsocket-connector="clientRSocketConnector"
                             request-channel="requestChannel"
                             rsocket-strategies="rsocketStrategies"
                             request-element-type="byte[]"/>

A 和 应配置为泛型定义。ClientRSocketConnectorServerRSocketConnector<bean>Spring中文文档

出境

<int-rsocket:outbound-gateway id="outboundGateway"
                              client-rsocket-connector="clientRSocketConnector"
                              auto-startup="false"
                              interaction-model="fireAndForget"
                              route-expression="'testRoute'"
                              request-channel="requestChannel"
                              publisher-element-type="byte[]"
                              expected-response-type="java.util.Date"
                              metadata-expression="{'metadata': new org.springframework.util.MimeType('*')}"/>

有关所有这些 XML 属性的说明,请参阅。spring-integration-rsocket.xsdSpring中文文档

使用 Java 配置 RSocket 端点

以下示例演示如何使用 Java 配置 RSocket 入站端点:Spring中文文档

@Bean
public RSocketInboundGateway rsocketInboundGatewayRequestReply() {
    RSocketInboundGateway rsocketInboundGateway = new RSocketInboundGateway("echo");
    rsocketInboundGateway.setRequestChannelName("requestReplyChannel");
    return rsocketInboundGateway;
}

@Transformer(inputChannel = "requestReplyChannel")
public Mono<String> echoTransformation(Flux<String> payload) {
    return payload.next().map(String::toUpperCase);
}

在此配置中假定 A 或,用于在“echo”路径上自动检测此类端点。 注意签名,它对 RSocket 请求的完全反应式处理并生成反应式回复。ClientRSocketConnectorServerRSocketConnector@TransformerSpring中文文档

以下示例演示如何使用 Java DSL 配置 RSocket 入站网关:Spring中文文档

@Bean
public IntegrationFlow rsocketUpperCaseFlow() {
    return IntegrationFlow
        .from(RSockets.inboundGateway("/uppercase")
                   .interactionModels(RSocketInteractionModel.requestChannel))
        .<Flux<String>, Mono<String>>transform((flux) -> flux.next().map(String::toUpperCase))
        .get();
}

在此配置中假定为 or,用于在“/uppercase”路径上自动检测此类端点,并将预期的交互模型称为“请求通道”。ClientRSocketConnectorServerRSocketConnectorSpring中文文档

以下示例演示如何使用 Java 配置 RSocket 出站网关:Spring中文文档

@Bean
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "replyChannel")
public RSocketOutboundGateway rsocketOutboundGateway() {
    RSocketOutboundGateway rsocketOutboundGateway =
            new RSocketOutboundGateway(
                    new FunctionExpression<Message<?>>((m) ->
                        m.getHeaders().get("route_header")));
    rsocketOutboundGateway.setInteractionModelExpression(
            new FunctionExpression<Message<?>>((m) -> m.getHeaders().get("rsocket_interaction_model")));
    rsocketOutboundGateway.setClientRSocketConnector(clientRSocketConnector());
    return rsocketOutboundGateway;
}

只有客户端才需要。 在服务器端,必须在请求消息中提供具有值的标头。setClientRSocketConnector()RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADERRSocketRequesterSpring中文文档

以下示例演示如何使用 Java DSL 配置 RSocket 出站网关:Spring中文文档

@Bean
public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) {
    return IntegrationFlow
        .from(Function.class)
        .handle(RSockets.outboundGateway("/uppercase")
            .interactionModel(RSocketInteractionModel.requestResponse)
            .expectedResponseType(String.class)
            .clientRSocketConnector(clientRSocketConnector))
        .get();
}

请参阅 IntegrationFlow 作为网关,了解如何使用上述流程开头提到的接口。FunctionSpring中文文档