此版本仍在开发中,尚未被视为稳定版本。最新的稳定版本请使用 Spring Framework 6.1.13spring-doc.cn

此版本仍在开发中,尚未被视为稳定版本。最新的稳定版本请使用 Spring Framework 6.1.13spring-doc.cn

本节描述了 Spring 框架对 RSocket 协议的支持。spring-doc.cn

概述

RSocket 是一种用于通过 TCP 进行多路复用、双工通信的应用程序协议, WebSocket 和其他字节流传输,使用以下交互之一 模型:spring-doc.cn

  • Request-Response— 发送一条消息,然后接收一条消息。spring-doc.cn

  • Request-Stream— 发送一条消息并接收回消息流。spring-doc.cn

  • Channel— 双向发送消息流。spring-doc.cn

  • Fire-and-Forget— 发送单向消息。spring-doc.cn

建立初始连接后,“client” 与 “server” 的区别将丢失,因为 双方都变得对称,并且每一方都可以发起上述交互之一。 这就是为什么在协议中将参与方称为 “requester” 和 “responder” 的原因 而上述交互称为 “请求流” 或简称为 “请求”。spring-doc.cn

以下是 RSocket 协议的主要功能和优势:spring-doc.cn

  • 跨网络边界的 Reactive Streams 语义 — 用于流式请求,例如 和 、背压信号 在请求者和响应者之间移动,允许请求者在 源,从而减少对网络层拥塞控制的依赖,以及需求 用于网络级别或任何级别的缓冲。Request-StreamChannelspring-doc.cn

  • 请求限制 — 此功能以 “Leasing” 命名,以 可以从每一端发送,以限制另一端允许的请求总数 在给定的时间内。租约会定期续订。LEASEspring-doc.cn

  • 会话恢复 — 这是为连接丢失而设计的,需要一些状态 待维护。状态管理对应用程序是透明的,并且运行良好 结合背压,可以在可能的情况下停止生产者并减少 所需的状态量。spring-doc.cn

  • 大型消息的分片和重新汇编。spring-doc.cn

  • Keepalive (检测信号)。spring-doc.cn

RSocket 具有多种语言的实现Java 库构建在 Project Reactor 之上, 和 Reactor Netty 用于运输。这意味着 应用程序中来自 Reactive Streams Publishers 的信号以透明方式传播 通过 RSocket 跨网络。spring-doc.cn

协议

RSocket 的好处之一是它在 wire 上具有明确定义的行为,并且 易于阅读的规范以及一些协议扩展。因此它是 阅读规范是个好主意,独立于语言实现和更高级别 框架 API 的 API 中。本节提供了简洁的概述,以建立一些上下文。spring-doc.cn

最初,客户端通过一些低级流传输(如 作为 TCP 或 WebSocket 进行设置,并向服务器发送一个帧,为 连接。SETUPspring-doc.cn

服务器可能会拒绝帧,但通常在发送帧后(对于 client) 和 received(对于服务器),双方都可以开始发出请求,除非指示使用 leasing 语义来限制请求的数量,在这种情况下 双方必须等待来自另一端的帧才能允许发出请求。SETUPSETUPLEASEspring-doc.cn

发出请求spring-doc.cn

建立连接后,双方都可以通过 帧 、 、 或 .每个 这些帧将一条消息从请求者传送到响应者。REQUEST_RESPONSEREQUEST_STREAMREQUEST_CHANNELREQUEST_FNFspring-doc.cn

然后,响应方可以返回包含响应消息的帧,在这种情况下 的请求者也可以发送具有更多请求的帧 消息。PAYLOADREQUEST_CHANNELPAYLOADspring-doc.cn

当请求涉及 和 等消息流时, 响应方必须遵循来自请求方的需求信号。Demand 表示为 消息数。初始需求在 和 帧 中指定。后续需求通过帧发出信号。Request-StreamChannelREQUEST_STREAMREQUEST_CHANNELREQUEST_Nspring-doc.cn

每一方还可以通过框架发送元数据通知,这些通知不会 与任何单个请求有关,但与整个连接有关。METADATA_PUSHspring-doc.cn

消息格式spring-doc.cn

RSocket 消息包含数据和元数据。元数据可用于发送路由、 证券令牌等数据和元数据的格式可以不同。每个 Mime 类型 在框架中声明,并应用于给定连接上的所有请求。SETUPspring-doc.cn

虽然所有消息都可以包含元数据,但通常元数据(如路由)是按请求进行的 因此仅包含在请求的第一条消息中,即包含帧 、 、 或 。REQUEST_RESPONSEREQUEST_STREAMREQUEST_CHANNELREQUEST_FNFspring-doc.cn

协议扩展定义用于应用程序的常见元数据格式:spring-doc.cn

Java 实现

RSocket 的 Java 实现构建在 Project Reactor 之上。TCP 和 WebSocket 的传输方式是 基于 Reactor Netty 构建。作为反应式流 库,Reactor 简化了实现协议的工作。对于应用程序,它是 天生适合使用,带有声明式运算符和透明背面 压力支持。FluxMonospring-doc.cn

RSocket Java 中的 API 有意做到最小和基本。它侧重于协议 功能,并将应用程序编程模型(例如 RPC codegen 与其他模型)保留为 更高层次,独立关注。spring-doc.cn

主合约 io.rsocket.RSocket 对四种请求交互类型进行建模,表示 single message 中,消息流和实际的 message 中访问数据和元数据作为字节缓冲区。使用 Contract 对称。对于请求,应用程序被赋予一个 to perform 请求与。为了响应,应用程序实现了处理请求。MonoFluxio.rsocket.PayloadRSocketRSocketRSocketspring-doc.cn

这并不是一个详尽的介绍。在大多数情况下,Spring 应用程序 不必直接使用其 API。但是,观察或试验可能很重要 使用 RSocket 独立于 Spring。RSocket Java 存储库包含许多示例应用程序,这些应用程序 演示其 API 和协议功能。spring-doc.cn

弹簧支撑

该模块包含以下内容:spring-messagingspring-doc.cn

该模块包含和实现,例如 Jackson CBOR/JSON 和 Protobuf 的 RSocket 应用程序可能需要。它还包含可插入以实现高效路由匹配的 。spring-webEncoderDecoderPathPatternParserspring-doc.cn

Spring Boot 2.2 支持通过 TCP 或 WebSocket 建立 RSocket 服务器,包括 在 WebFlux 服务器中通过 WebSocket 公开 RSocket 的选项。还有 Client 支持和自动配置 和 . 有关更多详细信息,请参阅 Spring Boot 参考中的 RSocket 部分RSocketRequester.BuilderRSocketStrategiesspring-doc.cn

Spring Security 5.2 提供了 RSocket 支持。spring-doc.cn

Spring 集成 5.2 提供了入站和出站网关来与 RSocket 交互 客户端和服务器。有关更多详细信息,请参见 Spring 集成参考手册。spring-doc.cn

Spring Cloud 网关支持 RSocket 连接。spring-doc.cn

RSocketRequester 请求者

RSocketRequester提供 Fluent API 来执行 RSocket 请求、接受和 返回 data 和 metadata 的对象,而不是低级数据缓冲区。可以使用 对称地,从 Client 端发出请求,以及从 Server 发出请求。spring-doc.cn

客户端请求者

要在客户端获取 an 就是连接到一个服务器,它涉及 发送带有连接设置的 RSocket 帧。 提供 构建器,帮助准备 Include 连接 框架的设置。RSocketRequesterSETUPRSocketRequesterio.rsocket.core.RSocketConnectorSETUPspring-doc.cn

这是使用默认设置进行连接的最基本方法:spring-doc.cn

RSocketRequester requester = RSocketRequester.builder().tcp("localhost", 7000);

URI url = URI.create("https://example.org:8080/rsocket");
RSocketRequester requester = RSocketRequester.builder().webSocket(url);
val requester = RSocketRequester.builder().tcp("localhost", 7000)

URI url = URI.create("https://example.org:8080/rsocket");
val requester = RSocketRequester.builder().webSocket(url)

以上不会立即连接。发出请求时,共享连接为 透明地建立并使用。spring-doc.cn

连接设置

RSocketRequester.Builder提供以下内容以自定义初始帧:SETUPspring-doc.cn

  • dataMimeType(MimeType)— 设置连接上数据的 MIME 类型。spring-doc.cn

  • metadataMimeType(MimeType)— 设置连接上元数据的 MIME 类型。spring-doc.cn

  • setupData(Object)— 要包含在 .SETUPspring-doc.cn

  • setupRoute(String, Object…​)— 路由到包含在元数据中。SETUPspring-doc.cn

  • setupMetadata(Object, MimeType)— 要包含在 .SETUPspring-doc.cn

对于数据,默认 mime 类型派生自第一个配置的 .为 metadata,则默认的 MIME 类型是 composite metadata,它允许多个 每个请求的元数据值和 MIME 类型对。通常,两者都不需要更改。Decoderspring-doc.cn

框架中的数据和元数据是可选的。在服务器端,可以使用@ConnectMapping方法处理 connection 和 frame 的内容。元数据可用于连接 级别安全性。SETUPSETUPspring-doc.cn

策略

RSocketRequester.Builderaccepts 来配置请求者。 您需要使用它来提供编码器和解码器,用于数据的 (de) 序列化和 metadata 值。默认情况下,仅注册 for 、 和 的基本编解码器。添加 (Added) 可以访问更多 可以按如下方式注册:RSocketStrategiesspring-coreStringbyte[]ByteBufferspring-webspring-doc.cn

RSocketStrategies strategies = RSocketStrategies.builder()
	.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
	.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
	.build();

RSocketRequester requester = RSocketRequester.builder()
	.rsocketStrategies(strategies)
	.tcp("localhost", 7000);
val strategies = RSocketStrategies.builder()
		.encoders { it.add(Jackson2CborEncoder()) }
		.decoders { it.add(Jackson2CborDecoder()) }
		.build()

val requester = RSocketRequester.builder()
		.rsocketStrategies(strategies)
		.tcp("localhost", 7000)

RSocketStrategies专为重复使用而设计。在某些情况下,例如客户端和服务器在 相同的应用程序,最好在 Spring 配置中声明它。spring-doc.cn

客户端响应方

RSocketRequester.Builder可用于配置对来自 服务器。spring-doc.cn

您可以使用带注释的处理程序进行基于相同的客户端响应 在服务器上使用但以编程方式注册的基础结构,如下所示:spring-doc.cn

RSocketStrategies strategies = RSocketStrategies.builder()
	.routeMatcher(new PathPatternRouteMatcher())  (1)
	.build();

SocketAcceptor responder =
	RSocketMessageHandler.responder(strategies, new ClientHandler()); (2)

RSocketRequester requester = RSocketRequester.builder()
	.rsocketConnector(connector -> connector.acceptor(responder)) (3)
	.tcp("localhost", 7000);
1 如果存在,请使用 ,以提高效率 路由匹配。PathPatternRouteMatcherspring-web
2 使用 and/or 方法从类创建响应方。@MessageMapping@ConnectMapping
3 注册响应方。
val strategies = RSocketStrategies.builder()
		.routeMatcher(PathPatternRouteMatcher())  (1)
		.build()

val responder =
	RSocketMessageHandler.responder(strategies, new ClientHandler()); (2)

val requester = RSocketRequester.builder()
		.rsocketConnector { it.acceptor(responder) } (3)
		.tcp("localhost", 7000)
1 如果存在,请使用 ,以提高效率 路由匹配。PathPatternRouteMatcherspring-web
2 使用 and/or 方法从类创建响应方。@MessageMapping@ConnectMapping
3 注册响应方。

请注意,以上只是专为 client 的编程注册而设计的快捷方式 反应。对于客户端响应者处于 Spring 配置中的替代场景, 你仍然可以声明为 Spring bean,然后按如下方式应用:RSocketMessageHandlerspring-doc.cn

ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);

RSocketRequester requester = RSocketRequester.builder()
	.rsocketConnector(connector -> connector.acceptor(handler.responder()))
	.tcp("localhost", 7000);
import org.springframework.beans.factory.getBean

val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()

val requester = RSocketRequester.builder()
		.rsocketConnector { it.acceptor(handler.responder()) }
		.tcp("localhost", 7000)

对于上述内容,您可能还需要在 中使用 切换到不同的策略来检测客户端响应者,例如基于自定义 注释(例如 VS 默认的 .这 在客户端和服务器或多个客户端位于同一 应用。setHandlerPredicateRSocketMessageHandler@RSocketClientResponder@Controllerspring-doc.cn

另请参阅 Annotated Responders ,以了解有关编程模型的更多信息。spring-doc.cn

高深

RSocketRequesterBuilder提供回调以公开 keepalive 的进一步配置选项的基础 intervals、session resumption、interceptor 等。您可以配置选项 在该级别,如下所示:io.rsocket.core.RSocketConnectorspring-doc.cn

RSocketRequester requester = RSocketRequester.builder()
	.rsocketConnector(connector -> {
		// ...
	})
	.tcp("localhost", 7000);
val requester = RSocketRequester.builder()
		.rsocketConnector {
			//...
		}
		.tcp("localhost", 7000)

服务器请求者

要从服务器向连接的客户端发出请求,只需获取 来自服务器的已连接客户端的 requester。spring-doc.cn

Annotated Responders 中,方法支持参数。使用它来访问连接的请求者。保留 请注意,方法本质上是 Frame 的处理程序,它 必须在请求开始之前处理。因此,请求在一开始就必须是 与处理分离。例如:@ConnectMapping@MessageMappingRSocketRequester@ConnectMappingSETUPspring-doc.cn

@ConnectMapping
Mono<Void> handle(RSocketRequester requester) {
	requester.route("status").data("5")
		.retrieveFlux(StatusReport.class)
		.subscribe(bar -> { (1)
			// ...
		});
	return ... (2)
}
1 异步启动请求,独立于处理。
2 执行处理并返回完成 。Mono<Void>
@ConnectMapping
suspend fun handle(requester: RSocketRequester) {
	GlobalScope.launch {
		requester.route("status").data("5").retrieveFlow<StatusReport>().collect { (1)
			// ...
		}
	}
	/// ... (2)
}
1 异步启动请求,独立于处理。
2 在 suspending 函数中执行处理。

请求

拥有客户端服务器请求者后,您可以按如下方式发出请求:spring-doc.cn

ViewBox viewBox = ... ;

Flux<AirportLocation> locations = requester.route("locate.radars.within") (1)
		.data(viewBox) (2)
		.retrieveFlux(AirportLocation.class); (3)
1 指定要包含在请求消息元数据中的路由。
2 为请求消息提供数据。
3 声明预期的响应。
val viewBox: ViewBox = ...

val locations = requester.route("locate.radars.within") (1)
		.data(viewBox) (2)
		.retrieveFlow<AirportLocation>() (3)
1 指定要包含在请求消息元数据中的路由。
2 为请求消息提供数据。
3 声明预期的响应。

交互类型由输入的基数隐式确定,而 输出。上面的示例是一个 because 一个 value 被发送和一个 stream of values 的值。在大多数情况下,您不需要考虑这个问题,只要 输入和输出的选择与 RSocket 交互类型以及 input 和 响应方所需的输出。无效组合的唯一示例是多对一。Request-Streamspring-doc.cn

该方法还接受任何 Reactive Streams ,包括 和 ,以及在 .对于多值(例如 which 会生成 相同类型的值,请考虑使用重载方法之一来避免 类型检查并查找每个元素:data(Object)PublisherFluxMonoReactiveAdapterRegistryPublisherFluxdataEncoderspring-doc.cn

data(Object producer, Class<?> elementClass);
data(Object producer, ParameterizedTypeReference<?> elementTypeRef);

该步骤是可选的。对于不发送数据的请求,请跳过它:data(Object)spring-doc.cn

Mono<AirportLocation> location = requester.route("find.radar.EWR"))
	.retrieveMono(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveAndAwait

val location = requester.route("find.radar.EWR")
	.retrieveAndAwait<AirportLocation>()

如果使用复合元数据(默认),并且可以添加额外的元数据值 值由已注册的 .例如:Encoderspring-doc.cn

String securityToken = ... ;
ViewBox viewBox = ... ;
MimeType mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0");

Flux<AirportLocation> locations = requester.route("locate.radars.within")
		.metadata(securityToken, mimeType)
		.data(viewBox)
		.retrieveFlux(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveFlow

val requester: RSocketRequester = ...

val securityToken: String = ...
val viewBox: ViewBox = ...
val mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0")

val locations = requester.route("locate.radars.within")
		.metadata(securityToken, mimeType)
		.data(viewBox)
		.retrieveFlow<AirportLocation>()

对于使用返回 .请注意,这仅表示消息已成功发送,而不表示消息已处理。Fire-and-Forgetsend()Mono<Void>Monospring-doc.cn

对于使用带有返回值的方法。Metadata-PushsendMetadata()Mono<Void>spring-doc.cn

1 如果存在,请使用 ,以提高效率 路由匹配。PathPatternRouteMatcherspring-web
2 使用 and/or 方法从类创建响应方。@MessageMapping@ConnectMapping
3 注册响应方。
1 如果存在,请使用 ,以提高效率 路由匹配。PathPatternRouteMatcherspring-web
2 使用 and/or 方法从类创建响应方。@MessageMapping@ConnectMapping
3 注册响应方。
1 异步启动请求,独立于处理。
2 执行处理并返回完成 。Mono<Void>
1 异步启动请求,独立于处理。
2 在 suspending 函数中执行处理。
1 指定要包含在请求消息元数据中的路由。
2 为请求消息提供数据。
3 声明预期的响应。
1 指定要包含在请求消息元数据中的路由。
2 为请求消息提供数据。
3 声明预期的响应。

带注释的响应者

RSocket 响应程序可以作为 and 方法实现。 方法处理单个请求,而方法处理 连接级事件 (Setup 和 Metadata Push)。支持带注释的响应者 对称地,用于从服务器端响应和从客户端响应。@MessageMapping@ConnectMapping@MessageMapping@ConnectMappingspring-doc.cn

服务器响应程序

要在服务器端使用带注释的响应者,请将 Spring 添加到您的 Spring 配置来检测带有 AND 方法的 bean:RSocketMessageHandler@Controller@MessageMapping@ConnectMappingspring-doc.cn

@Configuration
static class ServerConfig {

	@Bean
	public RSocketMessageHandler rsocketMessageHandler() {
		RSocketMessageHandler handler = new RSocketMessageHandler();
		handler.routeMatcher(new PathPatternRouteMatcher());
		return handler;
	}
}
@Configuration
class ServerConfig {

	@Bean
	fun rsocketMessageHandler() = RSocketMessageHandler().apply {
		routeMatcher = PathPatternRouteMatcher()
	}
}

然后通过 Java RSocket API 启动 RSocket 服务器,并为响应方插入 ,如下所示:RSocketMessageHandlerspring-doc.cn

ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);

CloseableChannel server =
	RSocketServer.create(handler.responder())
		.bind(TcpServerTransport.create("localhost", 7000))
		.block();
import org.springframework.beans.factory.getBean

val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()

val server = RSocketServer.create(handler.responder())
		.bind(TcpServerTransport.create("localhost", 7000))
		.awaitSingle()

RSocketMessageHandler默认情况下,支持复合元数据和路由元数据。如果需要切换到 不同的 MIME 类型或注册其他元数据 MIME 类型。spring-doc.cn

您需要设置元数据和数据所需的 和 实例 格式。您可能需要该模块来实现编解码器。EncoderDecoderspring-webspring-doc.cn

默认情况下,用于通过 匹配路由。 我们建议将 from for 高效的路由匹配。RSocket 路由可以是分层的,但不是 URL 路径。 默认情况下,两个路由匹配器都配置为使用 “.” 作为分隔符,并且没有 URL 解码方式与 HTTP URL 一样。SimpleRouteMatcherAntPathMatcherPathPatternRouteMatcherspring-webspring-doc.cn

RSocketMessageHandler可以通过以下方式进行配置,如果满足以下条件,则可能很有用 您需要在同一进程中在 Client 端和 Server 之间共享配置:RSocketStrategiesspring-doc.cn

@Configuration
static class ServerConfig {

	@Bean
	public RSocketMessageHandler rsocketMessageHandler() {
		RSocketMessageHandler handler = new RSocketMessageHandler();
		handler.setRSocketStrategies(rsocketStrategies());
		return handler;
	}

	@Bean
	public RSocketStrategies rsocketStrategies() {
		return RSocketStrategies.builder()
			.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
			.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
			.routeMatcher(new PathPatternRouteMatcher())
			.build();
	}
}
@Configuration
class ServerConfig {

	@Bean
	fun rsocketMessageHandler() = RSocketMessageHandler().apply {
		rSocketStrategies = rsocketStrategies()
	}

	@Bean
	fun rsocketStrategies() = RSocketStrategies.builder()
			.encoders { it.add(Jackson2CborEncoder()) }
			.decoders { it.add(Jackson2CborDecoder()) }
			.routeMatcher(PathPatternRouteMatcher())
			.build()
}

客户端响应方

客户端上带注释的响应程序需要在 .有关详细信息,请参阅 客户端响应程序RSocketRequester.Builderspring-doc.cn

@MessageMapping

服务器客户端响应方配置就位后,可以按如下方式使用方法:@MessageMappingspring-doc.cn

@Controller
public class RadarsController {

	@MessageMapping("locate.radars.within")
	public Flux<AirportLocation> radars(MapRequest request) {
		// ...
	}
}
@Controller
class RadarsController {

	@MessageMapping("locate.radars.within")
	fun radars(request: MapRequest): Flow<AirportLocation> {
		// ...
	}
}

上述方法响应具有 路由 “locate.radars.within”。它支持灵活的方法签名,并可选择 使用以下方法参数:@MessageMappingspring-doc.cn

方法参数 描述

@Payloadspring-doc.cn

请求的有效负载。这可以是异步类型的具体值,如 或 。MonoFluxspring-doc.cn

注意:使用注释是可选的。不是简单类型的方法参数 并且不是任何其他受支持的参数,则假定为预期的有效负载。spring-doc.cn

RSocketRequesterspring-doc.cn

请求者,用于向远程端发出请求。spring-doc.cn

@DestinationVariablespring-doc.cn

根据映射模式中的变量从路由中提取的值,例如 .@MessageMapping("find.radar.{id}")spring-doc.cn

@Headerspring-doc.cn

注册用于提取的元数据值,如 MetadataExtractor 中所述。spring-doc.cn

@Headers Map<String, Object>spring-doc.cn

注册用于提取的所有元数据值,如 MetadataExtractor 中所述。spring-doc.cn

返回值应为一个或多个要序列化为响应的 Object 负载。这可以是异步类型,如 or、具体值或 或者一个无值异步类型,例如 .MonoFluxvoidMono<Void>spring-doc.cn

方法支持的 RSocket 交互类型是根据 input 的基数(即 参数)和输出,其中 cardinality 的含义如下:@MessageMapping@Payloadspring-doc.cn

基数 描述

1spring-doc.cn

显式值或单值异步类型,如 .Mono<T>spring-doc.cn

spring-doc.cn

多值异步类型,如 .Flux<T>spring-doc.cn

0spring-doc.cn

对于 input,这意味着该方法没有参数。@Payloadspring-doc.cn

对于输出,这是 or 无值异步类型,例如 .voidMono<Void>spring-doc.cn

下表显示了所有输入和输出基数组合以及相应的 交互类型:spring-doc.cn

输入基数 输出基数 交互类型

0, 1spring-doc.cn

0spring-doc.cn

Fire-and-Forget, Request-Responsespring-doc.cn

0, 1spring-doc.cn

1spring-doc.cn

请求-响应spring-doc.cn

0, 1spring-doc.cn

spring-doc.cn

请求流spring-doc.cn

spring-doc.cn

0、1、多spring-doc.cn

请求通道spring-doc.cn

@RSocketExchange

作为 的替代方法,您还可以使用方法处理请求。此类方法在 RSocket 接口上声明,可以通过响应者作为请求者使用或由响应者实现。@MessageMapping@RSocketExchangeRSocketServiceProxyFactoryspring-doc.cn

例如,要以响应者身份处理请求:spring-doc.cn

public interface RadarsService {

	@RSocketExchange("locate.radars.within")
	Flux<AirportLocation> radars(MapRequest request);
}

@Controller
public class RadarsController implements RadarsService {

	public Flux<AirportLocation> radars(MapRequest request) {
		// ...
	}
}
interface RadarsService {

	@RSocketExchange("locate.radars.within")
	fun radars(request: MapRequest): Flow<AirportLocation>
}

@Controller
class RadarsController : RadarsService {

	override fun radars(request: MapRequest): Flow<AirportLocation> {
		// ...
	}
}

和 之间存在一些差异,因为 前者需要保持适合请求者和响应者使用。例如,while 可以声明为处理任意数量的路由,并且每个路由都可以 be 一个模式,必须使用单个具体路由来声明。有 与元数据相关的支持方法参数也存在细微差异,有关支持的参数列表,请参阅 @MessageMappingRSocket 接口@RSocketExhange@MessageMapping@MessageMapping@RSocketExchangespring-doc.cn

@RSocketExchange可以在类型级别使用,为所有路由指定通用前缀 对于给定的 RSocket 服务接口。spring-doc.cn

@ConnectMapping

@ConnectMapping在 RSocket 连接开始时处理帧,并且 通过框架的任何后续元数据推送通知,即 在。SETUPMETADATA_PUSHmetadataPush(Payload)io.rsocket.RSocketspring-doc.cn

@ConnectMapping方法支持与 @MessageMapping 相同的参数,但基于来自 AND 帧的元数据和数据。 可以有一个模式来缩小处理范围 在元数据中具有路由的特定连接,或者如果未声明任何模式 则所有连接都匹配。SETUPMETADATA_PUSH@ConnectMappingspring-doc.cn

@ConnectMapping方法不能返回数据,必须用 OR 作为返回值声明。如果处理返回新的 connection,则连接将被拒绝。处理不得搁置以使 请求 。有关详细信息,请参阅 Server RequestervoidMono<Void>RSocketRequesterspring-doc.cn

方法参数 描述

@Payloadspring-doc.cn

请求的有效负载。这可以是异步类型的具体值,如 或 。MonoFluxspring-doc.cn

注意:使用注释是可选的。不是简单类型的方法参数 并且不是任何其他受支持的参数,则假定为预期的有效负载。spring-doc.cn

RSocketRequesterspring-doc.cn

请求者,用于向远程端发出请求。spring-doc.cn

@DestinationVariablespring-doc.cn

根据映射模式中的变量从路由中提取的值,例如 .@MessageMapping("find.radar.{id}")spring-doc.cn

@Headerspring-doc.cn

注册用于提取的元数据值,如 MetadataExtractor 中所述。spring-doc.cn

@Headers Map<String, Object>spring-doc.cn

注册用于提取的所有元数据值,如 MetadataExtractor 中所述。spring-doc.cn

基数 描述

1spring-doc.cn

显式值或单值异步类型,如 .Mono<T>spring-doc.cn

spring-doc.cn

多值异步类型,如 .Flux<T>spring-doc.cn

0spring-doc.cn

对于 input,这意味着该方法没有参数。@Payloadspring-doc.cn

对于输出,这是 or 无值异步类型,例如 .voidMono<Void>spring-doc.cn

输入基数 输出基数 交互类型

0, 1spring-doc.cn

0spring-doc.cn

Fire-and-Forget, Request-Responsespring-doc.cn

0, 1spring-doc.cn

1spring-doc.cn

请求-响应spring-doc.cn

0, 1spring-doc.cn

spring-doc.cn

请求流spring-doc.cn

spring-doc.cn

0、1、多spring-doc.cn

请求通道spring-doc.cn

元数据提取器

响应方必须解释元数据。复合元数据允许独立 格式化的元数据值(例如,用于路由、安全、跟踪),每个值都有自己的 MIME 类型。应用程序需要一种方法来配置元数据 MIME 类型以支持,以及一种方法 以访问提取的值。spring-doc.cn

MetadataExtractor是一个合约,用于获取序列化元数据并返回解码 名称-值对,然后可以像 Headers 一样按名称访问,例如 via 在带注释的处理程序方法中。@Headerspring-doc.cn

DefaultMetadataExtractor可以为其提供实例来解码元数据。出 该盒子内置了对 “message/x.rsocket.routing.v0” 的支持,它将其解码并保存在 “route” 键下。对于任何其他 mime 类型,您需要提供 a 并注册 MIME 类型,如下所示:DecoderStringDecoderspring-doc.cn

DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(fooMimeType, Foo.class, "foo");
import org.springframework.messaging.rsocket.metadataToExtract

val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Foo>(fooMimeType, "foo")

复合元数据可以很好地组合独立的元数据值。但是, 请求者可能不支持复合元数据,或者可能选择不使用它。为此,可能需要自定义逻辑将解码值映射到输出 地图。以下是将 JSON 用于元数据的示例:DefaultMetadataExtractorspring-doc.cn

DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(
	MimeType.valueOf("application/vnd.myapp.metadata+json"),
	new ParameterizedTypeReference<Map<String,String>>() {},
	(jsonMap, outputMap) -> {
		outputMap.putAll(jsonMap);
	});
import org.springframework.messaging.rsocket.metadataToExtract

val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Map<String, String>>(MimeType.valueOf("application/vnd.myapp.metadata+json")) { jsonMap, outputMap ->
	outputMap.putAll(jsonMap)
}

通过 进行配置时,您可以让 使用配置的解码器创建提取器,并且 只需使用回调即可自定义注册,如下所示:MetadataExtractorRSocketStrategiesRSocketStrategies.Builderspring-doc.cn

RSocketStrategies strategies = RSocketStrategies.builder()
	.metadataExtractorRegistry(registry -> {
		registry.metadataToExtract(fooMimeType, Foo.class, "foo");
		// ...
	})
	.build();
import org.springframework.messaging.rsocket.metadataToExtract

val strategies = RSocketStrategies.builder()
		.metadataExtractorRegistry { registry: MetadataExtractorRegistry ->
			registry.metadataToExtract<Foo>(fooMimeType, "foo")
			// ...
		}
		.build()

RSocket 接口

Spring Framework 允许您将 RSocket 服务定义为带有方法的 Java 接口。你可以将这样的接口传递给 以创建一个通过 RSocketRequester 执行请求的代理。您还可以实现 interface 作为处理请求的响应方。@RSocketExchangeRSocketServiceProxyFactoryspring-doc.cn

首先使用 methods 创建接口:@RSocketExchangespring-doc.cn

interface RadarService {

	@RSocketExchange("radars")
	Flux<AirportLocation> getRadars(@Payload MapRequest request);

	// more RSocket exchange methods...

}

现在,您可以创建一个代理,在调用方法时执行请求:spring-doc.cn

RSocketRequester requester = ... ;
RSocketServiceProxyFactory factory = RSocketServiceProxyFactory.builder(requester).build();

RadarService service = factory.createClient(RadarService.class);

您还可以实现该接口以作为响应方处理请求。 请参阅 带注释的响应程序spring-doc.cn

方法参数

带注释的 RSocket 交换方法支持灵活的方法签名,如下所示 方法参数:spring-doc.cn

Method 参数 描述

@DestinationVariablespring-doc.cn

添加一个 route 变量,以便与 annotation 中的路由一起传递给,以扩展路由中的模板占位符。 此变量可以是 String 或任何 Object,然后通过 .RSocketRequester@RSocketExchangetoString()spring-doc.cn

@Payloadspring-doc.cn

设置请求的输入负载。这可以是具体值,也可以是任何 producer 可以通过以下方式适应反应式流的值PublisherReactiveAdapterRegistryspring-doc.cn

Object,如果后跟MimeTypespring-doc.cn

输入负载中元数据条目的值。这可以是任意长度 因为下一个参数是 metadata entry 。该值可以是具体的 value 或单个值的任何生产者,该 producer 可以通过 .ObjectMimeTypePublisherReactiveAdapterRegistryspring-doc.cn

MimeTypespring-doc.cn

用于元数据条目。前面的 method 参数应为 metadata 值。MimeTypespring-doc.cn

返回值

带注释的 RSocket 交换方法支持作为具体值的返回值,或者 任何可以通过 .PublisherReactiveAdapterRegistryspring-doc.cn

默认情况下,具有同步(阻塞)方法的 RSocket 服务方法的行为 签名取决于底层 RSocket 的响应超时设置以及 RSocket keep-alive 设置。 确实公开了一个选项,该选项还允许您配置阻止响应的最长时间, 但我们建议在 RSocket 级别配置超时值以获得更多控制。ClientTransportRSocketServiceProxyFactory.BuilderblockTimeoutspring-doc.cn

Method 参数 描述

@DestinationVariablespring-doc.cn

添加一个 route 变量,以便与 annotation 中的路由一起传递给,以扩展路由中的模板占位符。 此变量可以是 String 或任何 Object,然后通过 .RSocketRequester@RSocketExchangetoString()spring-doc.cn

@Payloadspring-doc.cn

设置请求的输入负载。这可以是具体值,也可以是任何 producer 可以通过以下方式适应反应式流的值PublisherReactiveAdapterRegistryspring-doc.cn

Object,如果后跟MimeTypespring-doc.cn

输入负载中元数据条目的值。这可以是任意长度 因为下一个参数是 metadata entry 。该值可以是具体的 value 或单个值的任何生产者,该 producer 可以通过 .ObjectMimeTypePublisherReactiveAdapterRegistryspring-doc.cn

MimeTypespring-doc.cn

用于元数据条目。前面的 method 参数应为 metadata 值。MimeTypespring-doc.cn