此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.4.0! |
RSocket 支持
RSocket Spring 集成模块 () 允许执行 RSocket 应用程序协议。spring-integration-rsocket
您需要将此依赖项包含在您的项目中:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-rsocket</artifactId>
<version>6.3.7-SNAPSHOT</version>
</dependency>
compile "org.springframework.integration:spring-integration-rsocket:6.3.7-SNAPSHOT"
该模块从版本 5.2 开始提供,并且基于 Spring Messaging 基础及其 RSocket 组件实现,例如 , 和 。
有关 RSocket 协议、术语和组件的更多信息,请参阅 Spring Framework RSocket 支持。RSocketRequester
RSocketMessageHandler
RSocketStrategies
在通过通道适配器开始集成流处理之前,我们需要在服务器和客户端之间建立 RSocket 连接。
为此,Spring 集成 RSocket 支持提供了 .ServerRSocketConnector
ClientRSocketConnector
AbstractRSocketConnector
根据提供的用于接受来自客户端的连接,在主机和端口上公开侦听器。
可以使用 自定义内部实例,也可以配置其他选项,例如 以及有效负载数据和标头元数据。
当客户端请求者提供 a 时(见下文),连接的客户端将作为 a 存储在由 确定的键下。
默认情况下,连接数据用于键,作为转换为具有 UTF-8 字符集的字符串的值。
这样的注册表可以在应用程序逻辑中使用,以确定特定的 Client 端连接,以便与它进行交互,或将相同的消息发布到所有连接的 Client 端。
从客户端建立连接时,将从 .
这类似于 Spring Messaging 模块中的 Comments 提供的内容。
映射模式表示接受所有客户端路由。
这可用于通过 header 区分不同的路由。ServerRSocketConnector
io.rsocket.transport.ServerTransport
RSocketServer
setServerConfigurer()
RSocketStrategies
MimeType
setupRoute
ClientRSocketConnector
RSocketRequester
clientRSocketKeyStrategy
BiFunction<Map<String, Object>, DataBuffer, Object>
RSocketRequester
RSocketConnectedEvent
ServerRSocketConnector
@ConnectMapping
*
RSocketConnectedEvent
DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER
典型的服务器配置可能如下所示:
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.decoder(StringDecoder.textPlainOnly())
.encoder(CharSequenceEncoder.allMimeTypes())
.dataBufferFactory(new DefaultDataBufferFactory(true))
.build();
}
@Bean
public ServerRSocketConnector serverRSocketConnector() {
ServerRSocketConnector serverRSocketConnector = new ServerRSocketConnector("localhost", 0);
serverRSocketConnector.setRSocketStrategies(rsocketStrategies());
serverRSocketConnector.setMetadataMimeType(new MimeType("message", "x.rsocket.routing.v0"));
serverRSocketConnector.setServerConfigurer((server) -> server.payloadDecoder(PayloadDecoder.ZERO_COPY));
serverRSocketConnector.setClientRSocketKeyStrategy((headers, data) -> ""
+ headers.get(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER));
return serverRSocketConnector;
}
@EventListener
public void onApplicationEvent(RSocketConnectedEvent event) {
...
}
所有选项(包括 bean 和 for )都是可选的。
有关更多信息,请参阅 JavaDocs。RSocketStrategies
@EventListener
RSocketConnectedEvent
ServerRSocketConnector
从版本 5.2.1 开始,它被提取到一个公共的顶级类中,以便与现有 RSocket 服务器建立可能的连接。
当 a 提供 的外部实例时,它不会在内部创建 RSocket 服务器,而只是将所有处理逻辑委托给提供的实例。
此外,还可以配置一个标志来处理 RSocket 控制器,完全替换标准 .
这在混合配置中非常有用,当经典方法与 RSocket 通道适配器一起存在于同一应用程序中,并且应用程序中存在外部配置的 RSocket 服务器时。ServerRSocketMessageHandler
ServerRSocketConnector
ServerRSocketMessageHandler
ServerRSocketMessageHandler
messageMappingCompatible
@MessageMapping
RSocketMessageHandler
@MessageMapping
该用作基于连接的支架。
可以使用提供的 进行自定义。
的 (带有可选的 templates 变量) 和 with metadata 也可以在此组件上配置。ClientRSocketConnector
RSocketRequester
RSocket
ClientTransport
RSocketConnector
RSocketConnectorConfigurer
setupRoute
setupData
典型的客户端配置可能如下所示:
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.decoder(StringDecoder.textPlainOnly())
.encoder(CharSequenceEncoder.allMimeTypes())
.dataBufferFactory(new DefaultDataBufferFactory(true))
.build();
}
@Bean
public ClientRSocketConnector clientRSocketConnector() {
ClientRSocketConnector clientRSocketConnector =
new ClientRSocketConnector("localhost", serverRSocketConnector().getBoundPort().block());
clientRSocketConnector.setRSocketStrategies(rsocketStrategies());
clientRSocketConnector.setSetupRoute("clientConnect/{user}");
clientRSocketConnector.setSetupRouteVariables("myUser");
return clientRSocketConnector;
}
这些选项中的大多数(包括 bean)都是可选的。
请注意我们如何在任意端口上连接到本地启动的 RSocket 服务器。
有关使用案例,请参阅。
有关更多信息,另请参阅及其超类 JavaDocs。RSocketStrategies
ServerRSocketConnector.clientRSocketKeyStrategy
setupData
ClientRSocketConnector
AbstractRSocketConnector
两者都负责将入站通道适配器映射到其配置,以路由传入的 RSocket 请求。
有关更多信息,请参阅下一节。ClientRSocketConnector
ServerRSocketConnector
path
RSocket 入站网关
负责接收 RSocket 请求并生成响应(如果有)。
它需要一个映射数组,该数组可以是类似于 MVC 请求映射或语义的模式。
此外,(从版本 5.2.2 开始),可以在 上配置一组交互模型(请参阅 ),以通过特定帧类型限制 RSocket 请求到此端点。
默认情况下,支持所有交互模型。
这样的 bean,根据其 implementation (extension of a ),由内部的 或 内部的路由逻辑自动检测,用于传入请求。
可以向 提供 for explicit endpoint registration.
这样,自动检测选项就会在该 上被禁用。
也可以注入到 中,或者它们是从提供的覆盖任何显式注入中获得的。
解码器用于根据提供的 .
如果在传入 中未提供标头,则会将请求视为 RSocket 交互模型。
在这种情况下,an 将对 执行普通操作。
否则,来自 Headers 的值用于向 RSocket 发送回复。
为此,an 对 执行操作。
要向下游发送的消息始终是 根据 logic 的。
在 RSocket 交互模型中,消息具有 plain converted .
回复可以是普通对象,也可以是 - 根据 中提供的编码器,将它们都正确地转换为 RSocket 响应。RSocketInboundGateway
path
@MessageMapping
RSocketInteractionModel
RSocketInboundGateway
IntegrationRSocketEndpoint
ReactiveMessageHandler
ServerRSocketConnector
ClientRSocketConnector
IntegrationRSocketMessageHandler
AbstractRSocketConnector
RSocketInboundGateway
AbstractRSocketConnector
RSocketStrategies
RSocketInboundGateway
AbstractRSocketConnector
RSocketStrategies
requestElementType
RSocketPayloadReturnValueHandler.RESPONSE_HEADER
Message
RSocketInboundGateway
fireAndForget
RSocketInboundGateway
send
outputChannel
MonoProcessor
RSocketPayloadReturnValueHandler.RESPONSE_HEADER
RSocketInboundGateway
sendAndReceiveMessageReactive
outputChannel
payload
Flux
MessagingRSocket
fireAndForget
payload
payload
Publisher
RSocketInboundGateway
RSocketStrategies
从版本 5.3 开始,将向 .
默认情况下,incoming 的转换方式与其每个事件单独解码的方式相同。
这是当前语义中存在的确切行为。
要根据应用程序要求恢复以前的行为或将整个解码为单个单元,必须将 设置为 。
但是,目标解码逻辑取决于所选逻辑,例如 a 需要在流中存在新的行分隔符(默认情况下)以指示字节缓冲区结束。decodeFluxAsUnit
false
RSocketInboundGateway
Flux
@MessageMapping
Flux
decodeFluxAsUnit
true
Decoder
StringDecoder
有关如何配置终端节点和处理下游负载的示例,请参阅使用 Java 配置 RSocket 终端节点。RSocketInboundGateway
RSocket 出站网关
用于在 RSocket 中执行请求,并根据 RSocket 回复(如果有)生成回复。
低级 RSocket 协议交互被委托给服务器端请求消息中提供的 Resolved 或 Headers。
服务器端的目标可以根据为 connect 请求映射选择的某些业务键,从 或使用 API 进行解析。
有关更多信息,请参阅 JavaDocs。RSocketOutboundGateway
AbstractReplyProducingMessageHandler
RSocketRequester
ClientRSocketConnector
RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER
RSocketRequester
RSocketConnectedEvent
ServerRSocketConnector.getClientRSocketRequester()
ServerRSocketConnector.setClientRSocketKeyStrategy()
ServerRSocketConnector
要发送请求必须显式配置(与路径变量一起)或通过根据请求消息进行评估的 SPEL 表达式进行配置。route
RSocket 交互模型可以通过选项或相应的表达式设置来提供。
默认情况下,a 用于常见的网关使用案例。RSocketInteractionModel
requestResponse
当请求消息有效负荷为 a 时,可以提供一个选项,以根据 target 中提供的 an 对其元素进行编码。
此选项的表达式的计算结果可以为 .
有关数据及其类型的更多信息,请参阅 JavaDocs。Publisher
publisherElementType
RSocketStrategies
RSocketRequester
ParameterizedTypeReference
RSocketRequester.RequestSpec.data()
RSocket 请求也可以使用 .
为此,可以在 .
这样的表达式的计算结果必须为 .metadata
metadataExpression
RSocketOutboundGateway
Map<Object, MimeType>
When is not ,必须提供 an。
默认情况下,它是一个。
此选项的表达式的计算结果可以为 .
有关回复数据及其类型的更多信息,请参阅 和 JavaDocs。interactionModel
fireAndForget
expectedResponseType
String.class
ParameterizedTypeReference
RSocketRequester.RetrieveSpec.retrieveMono()
RSocketRequester.RetrieveSpec.retrieveFlux()
来自 的回复 是一个 (即使对于交互模型也是如此) 始终使这个组件为 。
此类 a 在生成到常规频道之前订阅,或由 按需处理。
或 交互模型的响应也被包装到 reply 中。
它可以通过 直通服务激活器在下游展平:payload
RSocketOutboundGateway
Mono
fireAndForget
Mono<Void>
async
Mono
outputChannel
FluxMessageChannel
Flux
requestStream
requestChannel
Mono
FluxMessageChannel
@ServiceActivator(inputChannel = "rsocketReplyChannel", outputChannel ="fluxMessageChannel")
public Flux<?> flattenRSocketResponse(Flux<?> payload) {
return payload;
}
或在目标应用程序逻辑中显式订阅。
还可以配置预期的响应类型(或通过表达式评估)以将此网关视为出站通道适配器。
但是,仍然必须配置 (即使它只是一个 ) 以启动对返回的 .void
outputChannel
NullChannel
Mono
有关如何配置终端节点以处理下游有效负载的示例,请参阅使用 Java 配置 RSocket 终端节点。RSocketOutboundGateway
RSocket 命名空间支持
Spring 集成提供了一个名称空间和相应的模式定义。
要将其包含在您的配置中,请在您的应用程序上下文配置文件中添加以下命名空间声明:rsocket
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-rsocket="http://www.springframework.org/schema/integration/rsocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/rsocket
https://www.springframework.org/schema/integration/rsocket/spring-integration-rsocket.xsd">
...
</beans>
入境
要使用 XML 配置 Spring 集成 RSocket 入站通道适配器,您需要使用名称空间中的适当组件。
以下示例显示如何配置它:inbound-gateway
int-rsocket
<int-rsocket:inbound-gateway id="inboundGateway"
path="testPath"
interaction-models="requestStream,requestChannel"
rsocket-connector="clientRSocketConnector"
request-channel="requestChannel"
rsocket-strategies="rsocketStrategies"
request-element-type="byte[]"/>
A 和 应配置为通用定义。ClientRSocketConnector
ServerRSocketConnector
<bean>
出境
<int-rsocket:outbound-gateway id="outboundGateway"
client-rsocket-connector="clientRSocketConnector"
auto-startup="false"
interaction-model="fireAndForget"
route-expression="'testRoute'"
request-channel="requestChannel"
publisher-element-type="byte[]"
expected-response-type="java.util.Date"
metadata-expression="{'metadata': new org.springframework.util.MimeType('*')}"/>
有关所有这些 XML 属性的说明,请参见 。spring-integration-rsocket.xsd
使用 Java 配置 RSocket 端点
以下示例显示了如何使用 Java 配置 RSocket 入站端点:
@Bean
public RSocketInboundGateway rsocketInboundGatewayRequestReply() {
RSocketInboundGateway rsocketInboundGateway = new RSocketInboundGateway("echo");
rsocketInboundGateway.setRequestChannelName("requestReplyChannel");
return rsocketInboundGateway;
}
@Transformer(inputChannel = "requestReplyChannel")
public Mono<String> echoTransformation(Flux<String> payload) {
return payload.next().map(String::toUpperCase);
}
在此配置中假定 A 或,其含义是在 “echo” 路径上自动检测此类端点。
请注意签名,因为它对 RSocket 请求的完全反应式处理并生成反应式回复。ClientRSocketConnector
ServerRSocketConnector
@Transformer
以下示例显示了如何使用 Java DSL 配置 RSocket 入站网关:
@Bean
public IntegrationFlow rsocketUpperCaseFlow() {
return IntegrationFlow
.from(RSockets.inboundGateway("/uppercase")
.interactionModels(RSocketInteractionModel.requestChannel))
.<Flux<String>, Mono<String>>transform((flux) -> flux.next().map(String::toUpperCase))
.get();
}
在此配置中假定 A 或,其含义是在 “/uppercase” 路径上自动检测此类端点,并将预期的交互模型视为 “request channel”。ClientRSocketConnector
ServerRSocketConnector
以下示例显示如何使用 Java 配置 RSocket 出站网关:
@Bean
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "replyChannel")
public RSocketOutboundGateway rsocketOutboundGateway() {
RSocketOutboundGateway rsocketOutboundGateway =
new RSocketOutboundGateway(
new FunctionExpression<Message<?>>((m) ->
m.getHeaders().get("route_header")));
rsocketOutboundGateway.setInteractionModelExpression(
new FunctionExpression<Message<?>>((m) -> m.getHeaders().get("rsocket_interaction_model")));
rsocketOutboundGateway.setClientRSocketConnector(clientRSocketConnector());
return rsocketOutboundGateway;
}
只有客户端才需要。
在服务器端,必须在请求消息中提供带有值的标头。setClientRSocketConnector()
RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER
RSocketRequester
以下示例显示了如何使用 Java DSL 配置 RSocket 出站网关:
@Bean
public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) {
return IntegrationFlow
.from(Function.class)
.handle(RSockets.outboundGateway("/uppercase")
.interactionModel(RSocketInteractionModel.requestResponse)
.expectedResponseType(String.class)
.clientRSocketConnector(clientRSocketConnector))
.get();
}
有关如何使用上述流程开头提到的接口的更多信息,请参阅 IntegrationFlow
as a Gateway。Function