此版本仍在开发中,尚未被视为稳定版本。最新的稳定版本请使用 Spring Framework 6.1.13! |
此版本仍在开发中,尚未被视为稳定版本。最新的稳定版本请使用 Spring Framework 6.1.13! |
本节描述了 Spring 框架对 RSocket 协议的支持。
概述
RSocket 是一种用于通过 TCP 进行多路复用、双工通信的应用程序协议, WebSocket 和其他字节流传输,使用以下交互之一 模型:
-
Request-Response
— 发送一条消息,然后接收一条消息。 -
Request-Stream
— 发送一条消息并接收回消息流。 -
Channel
— 双向发送消息流。 -
Fire-and-Forget
— 发送单向消息。
建立初始连接后,“client” 与 “server” 的区别将丢失,因为 双方都变得对称,并且每一方都可以发起上述交互之一。 这就是为什么在协议中将参与方称为 “requester” 和 “responder” 的原因 而上述交互称为 “请求流” 或简称为 “请求”。
以下是 RSocket 协议的主要功能和优势:
-
跨网络边界的 Reactive Streams 语义 — 用于流式请求,例如 和 、背压信号 在请求者和响应者之间移动,允许请求者在 源,从而减少对网络层拥塞控制的依赖,以及需求 用于网络级别或任何级别的缓冲。
Request-Stream
Channel
-
请求限制 — 此功能以 “Leasing” 命名,以 可以从每一端发送,以限制另一端允许的请求总数 在给定的时间内。租约会定期续订。
LEASE
-
会话恢复 — 这是为连接丢失而设计的,需要一些状态 待维护。状态管理对应用程序是透明的,并且运行良好 结合背压,可以在可能的情况下停止生产者并减少 所需的状态量。
-
大型消息的分片和重新汇编。
-
Keepalive (检测信号)。
RSocket 具有多种语言的实现。Java 库构建在 Project Reactor 之上, 和 Reactor Netty 用于运输。这意味着 应用程序中来自 Reactive Streams Publishers 的信号以透明方式传播 通过 RSocket 跨网络。
协议
RSocket 的好处之一是它在 wire 上具有明确定义的行为,并且 易于阅读的规范以及一些协议扩展。因此它是 阅读规范是个好主意,独立于语言实现和更高级别 框架 API 的 API 中。本节提供了简洁的概述,以建立一些上下文。
连接
最初,客户端通过一些低级流传输(如
作为 TCP 或 WebSocket 进行设置,并向服务器发送一个帧,为
连接。SETUP
服务器可能会拒绝帧,但通常在发送帧后(对于 client)
和 received(对于服务器),双方都可以开始发出请求,除非指示使用 leasing 语义来限制请求的数量,在这种情况下
双方必须等待来自另一端的帧才能允许发出请求。SETUP
SETUP
LEASE
发出请求
建立连接后,双方都可以通过
帧 、 、 或 .每个
这些帧将一条消息从请求者传送到响应者。REQUEST_RESPONSE
REQUEST_STREAM
REQUEST_CHANNEL
REQUEST_FNF
然后,响应方可以返回包含响应消息的帧,在这种情况下
的请求者也可以发送具有更多请求的帧
消息。PAYLOAD
REQUEST_CHANNEL
PAYLOAD
当请求涉及 和 等消息流时,
响应方必须遵循来自请求方的需求信号。Demand 表示为
消息数。初始需求在 和 帧 中指定。后续需求通过帧发出信号。Request-Stream
Channel
REQUEST_STREAM
REQUEST_CHANNEL
REQUEST_N
每一方还可以通过框架发送元数据通知,这些通知不会
与任何单个请求有关,但与整个连接有关。METADATA_PUSH
消息格式
RSocket 消息包含数据和元数据。元数据可用于发送路由、
证券令牌等数据和元数据的格式可以不同。每个 Mime 类型
在框架中声明,并应用于给定连接上的所有请求。SETUP
虽然所有消息都可以包含元数据,但通常元数据(如路由)是按请求进行的
因此仅包含在请求的第一条消息中,即包含帧 、 、 或 。REQUEST_RESPONSE
REQUEST_STREAM
REQUEST_CHANNEL
REQUEST_FNF
协议扩展定义用于应用程序的常见元数据格式:
Java 实现
RSocket 的 Java 实现构建在 Project Reactor 之上。TCP 和 WebSocket 的传输方式是
基于 Reactor Netty 构建。作为反应式流
库,Reactor 简化了实现协议的工作。对于应用程序,它是
天生适合使用,带有声明式运算符和透明背面
压力支持。Flux
Mono
RSocket Java 中的 API 有意做到最小和基本。它侧重于协议 功能,并将应用程序编程模型(例如 RPC codegen 与其他模型)保留为 更高层次,独立关注。
主合约 io.rsocket.RSocket 对四种请求交互类型进行建模,表示
single message 中,消息流和实际的
message 中访问数据和元数据作为字节缓冲区。使用 Contract
对称。对于请求,应用程序被赋予一个 to perform
请求与。为了响应,应用程序实现了处理请求。Mono
Flux
io.rsocket.Payload
RSocket
RSocket
RSocket
这并不是一个详尽的介绍。在大多数情况下,Spring 应用程序 不必直接使用其 API。但是,观察或试验可能很重要 使用 RSocket 独立于 Spring。RSocket Java 存储库包含许多示例应用程序,这些应用程序 演示其 API 和协议功能。
弹簧支撑
该模块包含以下内容:spring-messaging
-
RSocketRequester — 用于发出请求的 Fluent API 通过 WITH DATA AND METADATA 编码/解码。
io.rsocket.RSocket
-
带注释的 Responders — 以及用于响应的带注释的处理程序方法。
@MessageMapping
@RSocketExchange
-
RSocket 接口 — RSocket 服务声明 作为带有方法的 Java 接口,用作请求者或响应者。
@RSocketExchange
该模块包含和实现,例如 Jackson
CBOR/JSON 和 Protobuf 的 RSocket 应用程序可能需要。它还包含可插入以实现高效路由匹配的 。spring-web
Encoder
Decoder
PathPatternParser
Spring Boot 2.2 支持通过 TCP 或 WebSocket 建立 RSocket 服务器,包括
在 WebFlux 服务器中通过 WebSocket 公开 RSocket 的选项。还有 Client
支持和自动配置 和 .
有关更多详细信息,请参阅 Spring Boot 参考中的 RSocket 部分。RSocketRequester.Builder
RSocketStrategies
Spring Security 5.2 提供了 RSocket 支持。
Spring 集成 5.2 提供了入站和出站网关来与 RSocket 交互 客户端和服务器。有关更多详细信息,请参见 Spring 集成参考手册。
Spring Cloud 网关支持 RSocket 连接。
RSocketRequester 请求者
RSocketRequester
提供 Fluent API 来执行 RSocket 请求、接受和
返回 data 和 metadata 的对象,而不是低级数据缓冲区。可以使用
对称地,从 Client 端发出请求,以及从 Server 发出请求。
客户端请求者
要在客户端获取 an 就是连接到一个服务器,它涉及
发送带有连接设置的 RSocket 帧。 提供
构建器,帮助准备 Include 连接
框架的设置。RSocketRequester
SETUP
RSocketRequester
io.rsocket.core.RSocketConnector
SETUP
这是使用默认设置进行连接的最基本方法:
-
Java
-
Kotlin
RSocketRequester requester = RSocketRequester.builder().tcp("localhost", 7000);
URI url = URI.create("https://example.org:8080/rsocket");
RSocketRequester requester = RSocketRequester.builder().webSocket(url);
val requester = RSocketRequester.builder().tcp("localhost", 7000)
URI url = URI.create("https://example.org:8080/rsocket");
val requester = RSocketRequester.builder().webSocket(url)
以上不会立即连接。发出请求时,共享连接为 透明地建立并使用。
连接设置
RSocketRequester.Builder
提供以下内容以自定义初始帧:SETUP
-
dataMimeType(MimeType)
— 设置连接上数据的 MIME 类型。 -
metadataMimeType(MimeType)
— 设置连接上元数据的 MIME 类型。 -
setupData(Object)
— 要包含在 .SETUP
-
setupRoute(String, Object…)
— 路由到包含在元数据中。SETUP
-
setupMetadata(Object, MimeType)
— 要包含在 .SETUP
对于数据,默认 mime 类型派生自第一个配置的 .为
metadata,则默认的 MIME 类型是 composite metadata,它允许多个
每个请求的元数据值和 MIME 类型对。通常,两者都不需要更改。Decoder
框架中的数据和元数据是可选的。在服务器端,可以使用@ConnectMapping方法处理
connection 和 frame 的内容。元数据可用于连接
级别安全性。SETUP
SETUP
策略
RSocketRequester.Builder
accepts 来配置请求者。
您需要使用它来提供编码器和解码器,用于数据的 (de) 序列化和
metadata 值。默认情况下,仅注册 for 、 和 的基本编解码器。添加 (Added) 可以访问更多
可以按如下方式注册:RSocketStrategies
spring-core
String
byte[]
ByteBuffer
spring-web
-
Java
-
Kotlin
RSocketStrategies strategies = RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
.build();
RSocketRequester requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.tcp("localhost", 7000);
val strategies = RSocketStrategies.builder()
.encoders { it.add(Jackson2CborEncoder()) }
.decoders { it.add(Jackson2CborDecoder()) }
.build()
val requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.tcp("localhost", 7000)
RSocketStrategies
专为重复使用而设计。在某些情况下,例如客户端和服务器在
相同的应用程序,最好在 Spring 配置中声明它。
客户端响应方
RSocketRequester.Builder
可用于配置对来自
服务器。
您可以使用带注释的处理程序进行基于相同的客户端响应 在服务器上使用但以编程方式注册的基础结构,如下所示:
-
Java
-
Kotlin
RSocketStrategies strategies = RSocketStrategies.builder()
.routeMatcher(new PathPatternRouteMatcher()) (1)
.build();
SocketAcceptor responder =
RSocketMessageHandler.responder(strategies, new ClientHandler()); (2)
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> connector.acceptor(responder)) (3)
.tcp("localhost", 7000);
1 | 如果存在,请使用 ,以提高效率
路由匹配。PathPatternRouteMatcher spring-web |
2 | 使用 and/or 方法从类创建响应方。@MessageMapping @ConnectMapping |
3 | 注册响应方。 |
val strategies = RSocketStrategies.builder()
.routeMatcher(PathPatternRouteMatcher()) (1)
.build()
val responder =
RSocketMessageHandler.responder(strategies, new ClientHandler()); (2)
val requester = RSocketRequester.builder()
.rsocketConnector { it.acceptor(responder) } (3)
.tcp("localhost", 7000)
1 | 如果存在,请使用 ,以提高效率
路由匹配。PathPatternRouteMatcher spring-web |
2 | 使用 and/or 方法从类创建响应方。@MessageMapping @ConnectMapping |
3 | 注册响应方。 |
请注意,以上只是专为 client 的编程注册而设计的快捷方式
反应。对于客户端响应者处于 Spring 配置中的替代场景,
你仍然可以声明为 Spring bean,然后按如下方式应用:RSocketMessageHandler
-
Java
-
Kotlin
ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> connector.acceptor(handler.responder()))
.tcp("localhost", 7000);
import org.springframework.beans.factory.getBean
val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()
val requester = RSocketRequester.builder()
.rsocketConnector { it.acceptor(handler.responder()) }
.tcp("localhost", 7000)
对于上述内容,您可能还需要在 中使用
切换到不同的策略来检测客户端响应者,例如基于自定义
注释(例如 VS 默认的 .这
在客户端和服务器或多个客户端位于同一
应用。setHandlerPredicate
RSocketMessageHandler
@RSocketClientResponder
@Controller
另请参阅 Annotated Responders ,以了解有关编程模型的更多信息。
高深
RSocketRequesterBuilder
提供回调以公开 keepalive 的进一步配置选项的基础
intervals、session resumption、interceptor 等。您可以配置选项
在该级别,如下所示:io.rsocket.core.RSocketConnector
-
Java
-
Kotlin
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> {
// ...
})
.tcp("localhost", 7000);
val requester = RSocketRequester.builder()
.rsocketConnector {
//...
}
.tcp("localhost", 7000)
服务器请求者
要从服务器向连接的客户端发出请求,只需获取 来自服务器的已连接客户端的 requester。
在 Annotated Responders 中,方法支持参数。使用它来访问连接的请求者。保留
请注意,方法本质上是 Frame 的处理程序,它
必须在请求开始之前处理。因此,请求在一开始就必须是
与处理分离。例如:@ConnectMapping
@MessageMapping
RSocketRequester
@ConnectMapping
SETUP
-
Java
-
Kotlin
@ConnectMapping
Mono<Void> handle(RSocketRequester requester) {
requester.route("status").data("5")
.retrieveFlux(StatusReport.class)
.subscribe(bar -> { (1)
// ...
});
return ... (2)
}
1 | 异步启动请求,独立于处理。 |
2 | 执行处理并返回完成 。Mono<Void> |
@ConnectMapping
suspend fun handle(requester: RSocketRequester) {
GlobalScope.launch {
requester.route("status").data("5").retrieveFlow<StatusReport>().collect { (1)
// ...
}
}
/// ... (2)
}
1 | 异步启动请求,独立于处理。 |
2 | 在 suspending 函数中执行处理。 |
请求
-
Java
-
Kotlin
ViewBox viewBox = ... ;
Flux<AirportLocation> locations = requester.route("locate.radars.within") (1)
.data(viewBox) (2)
.retrieveFlux(AirportLocation.class); (3)
1 | 指定要包含在请求消息元数据中的路由。 |
2 | 为请求消息提供数据。 |
3 | 声明预期的响应。 |
val viewBox: ViewBox = ...
val locations = requester.route("locate.radars.within") (1)
.data(viewBox) (2)
.retrieveFlow<AirportLocation>() (3)
1 | 指定要包含在请求消息元数据中的路由。 |
2 | 为请求消息提供数据。 |
3 | 声明预期的响应。 |
交互类型由输入的基数隐式确定,而
输出。上面的示例是一个 because 一个 value 被发送和一个 stream
of values 的值。在大多数情况下,您不需要考虑这个问题,只要
输入和输出的选择与 RSocket 交互类型以及 input 和
响应方所需的输出。无效组合的唯一示例是多对一。Request-Stream
该方法还接受任何 Reactive Streams ,包括 和 ,以及在 .对于多值(例如 which 会生成
相同类型的值,请考虑使用重载方法之一来避免
类型检查并查找每个元素:data(Object)
Publisher
Flux
Mono
ReactiveAdapterRegistry
Publisher
Flux
data
Encoder
data(Object producer, Class<?> elementClass);
data(Object producer, ParameterizedTypeReference<?> elementTypeRef);
该步骤是可选的。对于不发送数据的请求,请跳过它:data(Object)
-
Java
-
Kotlin
Mono<AirportLocation> location = requester.route("find.radar.EWR"))
.retrieveMono(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveAndAwait
val location = requester.route("find.radar.EWR")
.retrieveAndAwait<AirportLocation>()
如果使用复合元数据(默认),并且可以添加额外的元数据值
值由已注册的 .例如:Encoder
-
Java
-
Kotlin
String securityToken = ... ;
ViewBox viewBox = ... ;
MimeType mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0");
Flux<AirportLocation> locations = requester.route("locate.radars.within")
.metadata(securityToken, mimeType)
.data(viewBox)
.retrieveFlux(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveFlow
val requester: RSocketRequester = ...
val securityToken: String = ...
val viewBox: ViewBox = ...
val mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0")
val locations = requester.route("locate.radars.within")
.metadata(securityToken, mimeType)
.data(viewBox)
.retrieveFlow<AirportLocation>()
对于使用返回 .请注意,这仅表示消息已成功发送,而不表示消息已处理。Fire-and-Forget
send()
Mono<Void>
Mono
对于使用带有返回值的方法。Metadata-Push
sendMetadata()
Mono<Void>
1 | 如果存在,请使用 ,以提高效率
路由匹配。PathPatternRouteMatcher spring-web |
2 | 使用 and/or 方法从类创建响应方。@MessageMapping @ConnectMapping |
3 | 注册响应方。 |
1 | 如果存在,请使用 ,以提高效率
路由匹配。PathPatternRouteMatcher spring-web |
2 | 使用 and/or 方法从类创建响应方。@MessageMapping @ConnectMapping |
3 | 注册响应方。 |
1 | 异步启动请求,独立于处理。 |
2 | 执行处理并返回完成 。Mono<Void> |
1 | 异步启动请求,独立于处理。 |
2 | 在 suspending 函数中执行处理。 |
1 | 指定要包含在请求消息元数据中的路由。 |
2 | 为请求消息提供数据。 |
3 | 声明预期的响应。 |
1 | 指定要包含在请求消息元数据中的路由。 |
2 | 为请求消息提供数据。 |
3 | 声明预期的响应。 |
带注释的响应者
RSocket 响应程序可以作为 and 方法实现。 方法处理单个请求,而方法处理
连接级事件 (Setup 和 Metadata Push)。支持带注释的响应者
对称地,用于从服务器端响应和从客户端响应。@MessageMapping
@ConnectMapping
@MessageMapping
@ConnectMapping
服务器响应程序
要在服务器端使用带注释的响应者,请将 Spring 添加到您的 Spring
配置来检测带有 AND 方法的 bean:RSocketMessageHandler
@Controller
@MessageMapping
@ConnectMapping
-
Java
-
Kotlin
@Configuration
static class ServerConfig {
@Bean
public RSocketMessageHandler rsocketMessageHandler() {
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.routeMatcher(new PathPatternRouteMatcher());
return handler;
}
}
@Configuration
class ServerConfig {
@Bean
fun rsocketMessageHandler() = RSocketMessageHandler().apply {
routeMatcher = PathPatternRouteMatcher()
}
}
然后通过 Java RSocket API 启动 RSocket 服务器,并为响应方插入 ,如下所示:RSocketMessageHandler
-
Java
-
Kotlin
ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);
CloseableChannel server =
RSocketServer.create(handler.responder())
.bind(TcpServerTransport.create("localhost", 7000))
.block();
import org.springframework.beans.factory.getBean
val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()
val server = RSocketServer.create(handler.responder())
.bind(TcpServerTransport.create("localhost", 7000))
.awaitSingle()
您需要设置元数据和数据所需的 和 实例
格式。您可能需要该模块来实现编解码器。Encoder
Decoder
spring-web
默认情况下,用于通过 匹配路由。
我们建议将 from for
高效的路由匹配。RSocket 路由可以是分层的,但不是 URL 路径。
默认情况下,两个路由匹配器都配置为使用 “.” 作为分隔符,并且没有 URL
解码方式与 HTTP URL 一样。SimpleRouteMatcher
AntPathMatcher
PathPatternRouteMatcher
spring-web
RSocketMessageHandler
可以通过以下方式进行配置,如果满足以下条件,则可能很有用
您需要在同一进程中在 Client 端和 Server 之间共享配置:RSocketStrategies
-
Java
-
Kotlin
@Configuration
static class ServerConfig {
@Bean
public RSocketMessageHandler rsocketMessageHandler() {
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.setRSocketStrategies(rsocketStrategies());
return handler;
}
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
.routeMatcher(new PathPatternRouteMatcher())
.build();
}
}
@Configuration
class ServerConfig {
@Bean
fun rsocketMessageHandler() = RSocketMessageHandler().apply {
rSocketStrategies = rsocketStrategies()
}
@Bean
fun rsocketStrategies() = RSocketStrategies.builder()
.encoders { it.add(Jackson2CborEncoder()) }
.decoders { it.add(Jackson2CborDecoder()) }
.routeMatcher(PathPatternRouteMatcher())
.build()
}
客户端响应方
客户端上带注释的响应程序需要在 .有关详细信息,请参阅 客户端响应程序。RSocketRequester.Builder
@MessageMapping
-
Java
-
Kotlin
@Controller
public class RadarsController {
@MessageMapping("locate.radars.within")
public Flux<AirportLocation> radars(MapRequest request) {
// ...
}
}
@Controller
class RadarsController {
@MessageMapping("locate.radars.within")
fun radars(request: MapRequest): Flow<AirportLocation> {
// ...
}
}
上述方法响应具有
路由 “locate.radars.within”。它支持灵活的方法签名,并可选择
使用以下方法参数:@MessageMapping
方法参数 | 描述 |
---|---|
|
请求的有效负载。这可以是异步类型的具体值,如 或 。 注意:使用注释是可选的。不是简单类型的方法参数 并且不是任何其他受支持的参数,则假定为预期的有效负载。 |
|
请求者,用于向远程端发出请求。 |
|
根据映射模式中的变量从路由中提取的值,例如 . |
|
注册用于提取的元数据值,如 MetadataExtractor 中所述。 |
|
注册用于提取的所有元数据值,如 MetadataExtractor 中所述。 |
返回值应为一个或多个要序列化为响应的 Object
负载。这可以是异步类型,如 or、具体值或
或者一个无值异步类型,例如 .Mono
Flux
void
Mono<Void>
方法支持的 RSocket 交互类型是根据
input 的基数(即 参数)和输出,其中
cardinality 的含义如下:@MessageMapping
@Payload
基数 | 描述 |
---|---|
1 |
显式值或单值异步类型,如 . |
多 |
多值异步类型,如 . |
0 |
对于 input,这意味着该方法没有参数。 对于输出,这是 or 无值异步类型,例如 . |
下表显示了所有输入和输出基数组合以及相应的 交互类型:
输入基数 | 输出基数 | 交互类型 |
---|---|---|
0, 1 |
0 |
Fire-and-Forget, Request-Response |
0, 1 |
1 |
请求-响应 |
0, 1 |
多 |
请求流 |
多 |
0、1、多 |
请求通道 |
@RSocketExchange
作为 的替代方法,您还可以使用方法处理请求。此类方法在 RSocket 接口上声明,可以通过响应者作为请求者使用或由响应者实现。@MessageMapping
@RSocketExchange
RSocketServiceProxyFactory
例如,要以响应者身份处理请求:
-
Java
-
Kotlin
public interface RadarsService {
@RSocketExchange("locate.radars.within")
Flux<AirportLocation> radars(MapRequest request);
}
@Controller
public class RadarsController implements RadarsService {
public Flux<AirportLocation> radars(MapRequest request) {
// ...
}
}
interface RadarsService {
@RSocketExchange("locate.radars.within")
fun radars(request: MapRequest): Flow<AirportLocation>
}
@Controller
class RadarsController : RadarsService {
override fun radars(request: MapRequest): Flow<AirportLocation> {
// ...
}
}
和 之间存在一些差异,因为
前者需要保持适合请求者和响应者使用。例如,while 可以声明为处理任意数量的路由,并且每个路由都可以
be 一个模式,必须使用单个具体路由来声明。有
与元数据相关的支持方法参数也存在细微差异,有关支持的参数列表,请参阅 @MessageMapping 和 RSocket 接口。@RSocketExhange
@MessageMapping
@MessageMapping
@RSocketExchange
@RSocketExchange
可以在类型级别使用,为所有路由指定通用前缀
对于给定的 RSocket 服务接口。
@ConnectMapping
@ConnectMapping
在 RSocket 连接开始时处理帧,并且
通过框架的任何后续元数据推送通知,即 在。SETUP
METADATA_PUSH
metadataPush(Payload)
io.rsocket.RSocket
@ConnectMapping
方法支持与 @MessageMapping 相同的参数,但基于来自 AND 帧的元数据和数据。 可以有一个模式来缩小处理范围
在元数据中具有路由的特定连接,或者如果未声明任何模式
则所有连接都匹配。SETUP
METADATA_PUSH
@ConnectMapping
@ConnectMapping
方法不能返回数据,必须用 OR 作为返回值声明。如果处理返回新的
connection,则连接将被拒绝。处理不得搁置以使
请求 。有关详细信息,请参阅 Server Requester 。void
Mono<Void>
RSocketRequester
方法参数 | 描述 |
---|---|
|
请求的有效负载。这可以是异步类型的具体值,如 或 。 注意:使用注释是可选的。不是简单类型的方法参数 并且不是任何其他受支持的参数,则假定为预期的有效负载。 |
|
请求者,用于向远程端发出请求。 |
|
根据映射模式中的变量从路由中提取的值,例如 . |
|
注册用于提取的元数据值,如 MetadataExtractor 中所述。 |
|
注册用于提取的所有元数据值,如 MetadataExtractor 中所述。 |
基数 | 描述 |
---|---|
1 |
显式值或单值异步类型,如 . |
多 |
多值异步类型,如 . |
0 |
对于 input,这意味着该方法没有参数。 对于输出,这是 or 无值异步类型,例如 . |
输入基数 | 输出基数 | 交互类型 |
---|---|---|
0, 1 |
0 |
Fire-and-Forget, Request-Response |
0, 1 |
1 |
请求-响应 |
0, 1 |
多 |
请求流 |
多 |
0、1、多 |
请求通道 |
元数据提取器
响应方必须解释元数据。复合元数据允许独立 格式化的元数据值(例如,用于路由、安全、跟踪),每个值都有自己的 MIME 类型。应用程序需要一种方法来配置元数据 MIME 类型以支持,以及一种方法 以访问提取的值。
MetadataExtractor
是一个合约,用于获取序列化元数据并返回解码
名称-值对,然后可以像 Headers 一样按名称访问,例如 via 在带注释的处理程序方法中。@Header
DefaultMetadataExtractor
可以为其提供实例来解码元数据。出
该盒子内置了对 “message/x.rsocket.routing.v0” 的支持,它将其解码并保存在 “route” 键下。对于任何其他 mime 类型,您需要提供
a 并注册 MIME 类型,如下所示:Decoder
String
Decoder
-
Java
-
Kotlin
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(fooMimeType, Foo.class, "foo");
import org.springframework.messaging.rsocket.metadataToExtract
val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Foo>(fooMimeType, "foo")
复合元数据可以很好地组合独立的元数据值。但是,
请求者可能不支持复合元数据,或者可能选择不使用它。为此,可能需要自定义逻辑将解码值映射到输出
地图。以下是将 JSON 用于元数据的示例:DefaultMetadataExtractor
-
Java
-
Kotlin
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(
MimeType.valueOf("application/vnd.myapp.metadata+json"),
new ParameterizedTypeReference<Map<String,String>>() {},
(jsonMap, outputMap) -> {
outputMap.putAll(jsonMap);
});
import org.springframework.messaging.rsocket.metadataToExtract
val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Map<String, String>>(MimeType.valueOf("application/vnd.myapp.metadata+json")) { jsonMap, outputMap ->
outputMap.putAll(jsonMap)
}
通过 进行配置时,您可以让 使用配置的解码器创建提取器,并且
只需使用回调即可自定义注册,如下所示:MetadataExtractor
RSocketStrategies
RSocketStrategies.Builder
-
Java
-
Kotlin
RSocketStrategies strategies = RSocketStrategies.builder()
.metadataExtractorRegistry(registry -> {
registry.metadataToExtract(fooMimeType, Foo.class, "foo");
// ...
})
.build();
import org.springframework.messaging.rsocket.metadataToExtract
val strategies = RSocketStrategies.builder()
.metadataExtractorRegistry { registry: MetadataExtractorRegistry ->
registry.metadataToExtract<Foo>(fooMimeType, "foo")
// ...
}
.build()
RSocket 接口
Spring Framework 允许您将 RSocket 服务定义为带有方法的 Java 接口。你可以将这样的接口传递给 以创建一个通过 RSocketRequester 执行请求的代理。您还可以实现
interface 作为处理请求的响应方。@RSocketExchange
RSocketServiceProxyFactory
首先使用 methods 创建接口:@RSocketExchange
interface RadarService {
@RSocketExchange("radars")
Flux<AirportLocation> getRadars(@Payload MapRequest request);
// more RSocket exchange methods...
}
现在,您可以创建一个代理,在调用方法时执行请求:
RSocketRequester requester = ... ;
RSocketServiceProxyFactory factory = RSocketServiceProxyFactory.builder(requester).build();
RadarService service = factory.createClient(RadarService.class);
您还可以实现该接口以作为响应方处理请求。 请参阅 带注释的响应程序。
方法参数
带注释的 RSocket 交换方法支持灵活的方法签名,如下所示 方法参数:
Method 参数 | 描述 |
---|---|
|
添加一个 route 变量,以便与 annotation 中的路由一起传递给,以扩展路由中的模板占位符。
此变量可以是 String 或任何 Object,然后通过 . |
|
设置请求的输入负载。这可以是具体值,也可以是任何 producer
可以通过以下方式适应反应式流的值 |
|
输入负载中元数据条目的值。这可以是任意长度
因为下一个参数是 metadata entry 。该值可以是具体的
value 或单个值的任何生产者,该 producer 可以通过 . |
|
用于元数据条目。前面的 method 参数应为
metadata 值。 |
Method 参数 | 描述 |
---|---|
|
添加一个 route 变量,以便与 annotation 中的路由一起传递给,以扩展路由中的模板占位符。
此变量可以是 String 或任何 Object,然后通过 . |
|
设置请求的输入负载。这可以是具体值,也可以是任何 producer
可以通过以下方式适应反应式流的值 |
|
输入负载中元数据条目的值。这可以是任意长度
因为下一个参数是 metadata entry 。该值可以是具体的
value 或单个值的任何生产者,该 producer 可以通过 . |
|
用于元数据条目。前面的 method 参数应为
metadata 值。 |