此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Framework 6.1.10Spring中文文档

此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Framework 6.1.10Spring中文文档

本节介绍 Spring Framework 对 RSocket 协议的支持。Spring中文文档

概述

RSocket 是一种通过 TCP 进行多路复用双工通信的应用程序协议, WebSocket 和其他字节流传输,使用以下交互之一 模型:Spring中文文档

建立初始连接后,“客户端”与“服务器”的区别将丢失,因为 两边变得对称,每边都可以发起上述相互作用之一。 这就是为什么在协议中称参与方为“请求者”和“响应者”的原因 而上述交互称为“请求流”或简称为“请求”。Spring中文文档

以下是 RSocket 协议的主要功能和优势:Spring中文文档

  • 响应式流语义跨网络边界 — 用于流请求,如 和 、背压信号 在请求者和响应者之间移动,允许请求者在以下位置减慢响应者的速度 源,从而减少对网络层拥塞控制的依赖,以及需求 用于在网络级别或任何级别进行缓冲。Request-StreamChannelSpring中文文档

  • 请求限制 — 此功能以以下帧命名“租赁” 可以从两端发送,以限制另一端允许的请求总数 在给定的时间内。租约定期续签。LEASESpring中文文档

  • 会话恢复 — 这是为失去连接而设计的,需要某种状态 待维护。状态管理对应用程序是透明的,并且运行良好 结合背压,可以在可能的情况下停止生产者并减少 所需的状态量。Spring中文文档

  • 大型消息的碎片化和重新组合。Spring中文文档

  • 保持活力(心跳)。Spring中文文档

RSocket 具有多种语言的实现Java 库建立在 Project Reactor 上, 和 Reactor Netty 用于运输。这意味着 来自应用程序中 Reactive Streams 发布者的信号以透明方式传播 通过 RSocket 通过网络。Spring中文文档

议定书

RSocket 的优点之一是它在电线上具有明确定义的行为,并且 易于阅读的规范以及一些协议扩展。因此,它是 阅读规范是个好主意,独立于语言实现和更高级别 框架 API。本节提供简明扼要的概述,以建立一些上下文。Spring中文文档

最初,客户端通过一些低级流传输连接到服务器,例如 作为 TCP 或 WebSocket,并向服务器发送一个帧以设置参数 连接。SETUPSpring中文文档

服务器可能会拒绝帧,但通常在发送帧后(对于客户端) 和接收(对于服务器),双方都可以开始发出请求,除非指示使用租赁语义来限制请求的数量,在这种情况下 双方都必须等待来自另一端的帧才能允许发出请求。SETUPSETUPLEASESpring中文文档

提出请求Spring中文文档

建立连接后,双方都可以通过以下方式发起请求 帧 、 、 或 .每个 这些帧将一条消息从请求者传递到响应者。REQUEST_RESPONSEREQUEST_STREAMREQUEST_CHANNELREQUEST_FNFSpring中文文档

然后,响应者可以返回带有响应消息的帧,在这种情况下 的请求者还可以发送具有更多请求的帧 消息。PAYLOADREQUEST_CHANNELPAYLOADSpring中文文档

当请求涉及消息流(如 和 、 响应者必须尊重请求者发出的需求信号。需求表示为 消息数。初始需求在 和 帧中指定。后续需求通过帧发出信号。Request-StreamChannelREQUEST_STREAMREQUEST_CHANNELREQUEST_NSpring中文文档

每一方还可以通过框架发送元数据通知,但不会 与任何单个请求有关,而是与整个连接有关。METADATA_PUSHSpring中文文档

消息格式Spring中文文档

RSocket 消息包含数据和元数据。元数据可用于发送路由 安全令牌等数据和元数据的格式可以不同。每个 MIME 类型 在帧中声明并应用于给定连接上的所有请求。SETUPSpring中文文档

虽然所有消息都可以具有元数据,但通常元数据(如路由)是每个请求的 因此仅包含在请求的第一条消息中,即具有 、 、 或 的帧之一。REQUEST_RESPONSEREQUEST_STREAMREQUEST_CHANNELREQUEST_FNFSpring中文文档

协议扩展定义了用于应用程序的常见元数据格式:Spring中文文档

Java 实现

RSocket 的 Java 实现基于 Project Reactor 构建。TCP 和 WebSocket 的传输是 建立在 Reactor Netty 上。作为反应式流 library, Reactor 简化了协议的实现工作。对于应用程序,它是 自然适合使用,并带有声明式运算符和透明背面 压力支持。FluxMonoSpring中文文档

RSocket Java 中的 API 有意做到最小且基本。它侧重于协议 功能并将应用程序编程模型(例如 RPC 代码生成与其他)作为 层次更高,独立关注。Spring中文文档

主合约 io.rsocket.RSocket 对四种请求交互类型进行建模,表示对 单条消息、消息流和实际 具有访问数据和元数据作为字节缓冲区的消息。使用合同 对称。对于请求,应用程序被赋予执行 请求。为了响应,应用程序实现以处理请求。MonoFluxio.rsocket.PayloadRSocketRSocketRSocketSpring中文文档

这并不是一个彻底的介绍。在大多数情况下,Spring 应用程序 不必直接使用其 API。但是,观察或实验可能很重要 与独立于 Spring 的 RSocket 一起使用。RSocket Java 存储库包含许多示例应用,这些应用 演示其 API 和协议功能。Spring中文文档

Spring 支持

该模块包含以下内容:spring-messagingSpring中文文档

该模块包含和实现,例如 Jackson RSocket 应用程序可能需要的 CBOR/JSON 和 Protobuf。它还包含可插入的 可进行有效的路由匹配。spring-webEncoderDecoderPathPatternParserSpring中文文档

Spring Boot 2.2 支持通过 TCP 或 WebSocket 建立 RSocket 服务器,包括 在 WebFlux 服务器中通过 WebSocket 公开 RSocket 的选项。还有客户 支持和自动配置 和 . 有关详细信息,请参阅 Spring Boot 参考中的 RSocket 部分RSocketRequester.BuilderRSocketStrategiesSpring中文文档

Spring Security 5.2 提供 RSocket 支持。Spring中文文档

Spring Integration 5.2 提供了与 RSocket 交互的入站和出站网关 客户端和服务器。有关更多详细信息,请参阅 Spring 集成参考手册。Spring中文文档

Spring Cloud Gateway 支持 RSocket 连接。Spring中文文档

RSocketRequester

RSocketRequester提供流畅的 API 来执行 RSocket 请求,接受和 返回数据和元数据的对象,而不是低级数据缓冲区。它可以使用 对称地,从客户端发出请求,从服务器发出请求。Spring中文文档

客户端请求者

在客户端获取一个是连接到一个服务器,它涉及 发送具有连接设置的 RSocket 帧。 提供 有助于准备包含连接的构建器 框架的设置。RSocketRequesterSETUPRSocketRequesterio.rsocket.core.RSocketConnectorSETUPSpring中文文档

这是使用默认设置进行连接的最基本方法:Spring中文文档

RSocketRequester requester = RSocketRequester.builder().tcp("localhost", 7000);

URI url = URI.create("https://example.org:8080/rsocket");
RSocketRequester requester = RSocketRequester.builder().webSocket(url);
val requester = RSocketRequester.builder().tcp("localhost", 7000)

URI url = URI.create("https://example.org:8080/rsocket");
val requester = RSocketRequester.builder().webSocket(url)

以上不会立即连接。发出请求时,共享连接是 透明地建立和使用。Spring中文文档

连接设置

RSocketRequester.Builder提供以下内容来自定义初始帧:SETUPSpring中文文档

对于数据,默认 MIME 类型派生自第一个配置的 。为 元数据,默认的 MIME 类型是复合元数据,它允许多个 每个请求的元数据值和 MIME 类型对。通常,两者都不需要更改。DecoderSpring中文文档

框架中的数据和元数据是可选的。在服务器端,@ConnectMapping方法可用于处理 连接和框架的内容。元数据可用于连接 级别安全。SETUPSETUPSpring中文文档

策略

RSocketRequester.Builder接受以配置请求者。 您需要使用它来提供编码器和解码器,以便对数据进行(反)序列化和 元数据值。默认情况下,仅注册 for 、 和 中的基本编解码器。添加提供了对更多内容的访问 可以按如下方式注册:RSocketStrategiesspring-coreStringbyte[]ByteBufferspring-webSpring中文文档

RSocketStrategies strategies = RSocketStrategies.builder()
	.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
	.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
	.build();

RSocketRequester requester = RSocketRequester.builder()
	.rsocketStrategies(strategies)
	.tcp("localhost", 7000);
val strategies = RSocketStrategies.builder()
		.encoders { it.add(Jackson2CborEncoder()) }
		.decoders { it.add(Jackson2CborDecoder()) }
		.build()

val requester = RSocketRequester.builder()
		.rsocketStrategies(strategies)
		.tcp("localhost", 7000)

RSocketStrategies专为重复使用而设计。在某些情况下,例如客户端和服务器 相同的应用程序,最好在 Spring 配置中声明它。Spring中文文档

客户端响应者

RSocketRequester.Builder可用于配置响应器,以响应来自 服务器。Spring中文文档

您可以使用带注释的处理程序进行基于相同的客户端响应 服务器上使用的基础结构,但以编程方式注册,如下所示:Spring中文文档

RSocketStrategies strategies = RSocketStrategies.builder()
	.routeMatcher(new PathPatternRouteMatcher())  (1)
	.build();

SocketAcceptor responder =
	RSocketMessageHandler.responder(strategies, new ClientHandler()); (2)

RSocketRequester requester = RSocketRequester.builder()
	.rsocketConnector(connector -> connector.acceptor(responder)) (3)
	.tcp("localhost", 7000);
1 如果存在,请使用,以提高效率 路由匹配。PathPatternRouteMatcherspring-web
2 从具有 and/or 方法的类创建响应器。@MessageMapping@ConnectMapping
3 注册响应程序。
val strategies = RSocketStrategies.builder()
		.routeMatcher(PathPatternRouteMatcher())  (1)
		.build()

val responder =
	RSocketMessageHandler.responder(strategies, new ClientHandler()); (2)

val requester = RSocketRequester.builder()
		.rsocketConnector { it.acceptor(responder) } (3)
		.tcp("localhost", 7000)
1 如果存在,请使用,以提高效率 路由匹配。PathPatternRouteMatcherspring-web
2 从具有 and/or 方法的类创建响应器。@MessageMapping@ConnectMapping
3 注册响应程序。

请注意,以上只是为客户端的编程注册而设计的快捷方式 反应。对于客户端响应程序处于 Spring 配置中的替代方案, 您仍然可以声明为 Spring Bean,然后按如下方式应用:RSocketMessageHandlerSpring中文文档

ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);

RSocketRequester requester = RSocketRequester.builder()
	.rsocketConnector(connector -> connector.acceptor(handler.responder()))
	.tcp("localhost", 7000);
import org.springframework.beans.factory.getBean

val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()

val requester = RSocketRequester.builder()
		.rsocketConnector { it.acceptor(handler.responder()) }
		.tcp("localhost", 7000)

对于上述内容,您可能还需要使用 切换到不同的策略来检测客户端响应者,例如基于自定义 注解,例如 vs 默认 .这 在客户端和服务器或多个客户端同时使用的情况下是必需的 应用。setHandlerPredicateRSocketMessageHandler@RSocketClientResponder@ControllerSpring中文文档

另请参阅 Annotated Responders,了解有关编程模型的更多信息。Spring中文文档

高深

RSocketRequesterBuilder提供回调以公开基础,以便为 keepalive 提供进一步的配置选项 间隔、会话恢复、拦截器等。您可以配置选项 在该级别如下:io.rsocket.core.RSocketConnectorSpring中文文档

RSocketRequester requester = RSocketRequester.builder()
	.rsocketConnector(connector -> {
		// ...
	})
	.tcp("localhost", 7000);
val requester = RSocketRequester.builder()
		.rsocketConnector {
			//...
		}
		.tcp("localhost", 7000)

服务器请求程序

要从服务器向连接的客户端发出请求,只需获取 来自服务器的已连接客户端的请求者。Spring中文文档

Annotated Responders 中,方法支持参数。使用它来访问连接的请求者。保持 请注意,方法本质上是框架的处理程序,它 必须先处理,然后才能开始请求。因此,一开始的请求必须是 与处理脱钩。例如:@ConnectMapping@MessageMappingRSocketRequester@ConnectMappingSETUPSpring中文文档

@ConnectMapping
Mono<Void> handle(RSocketRequester requester) {
	requester.route("status").data("5")
		.retrieveFlux(StatusReport.class)
		.subscribe(bar -> { (1)
			// ...
		});
	return ... (2)
}
1 异步启动请求,独立于处理。
2 执行处理和返回完成。Mono<Void>
@ConnectMapping
suspend fun handle(requester: RSocketRequester) {
	GlobalScope.launch {
		requester.route("status").data("5").retrieveFlow<StatusReport>().collect { (1)
			// ...
		}
	}
	/// ... (2)
}
1 异步启动请求,独立于处理。
2 在挂起功能中执行处理。

请求

拥有客户端服务器请求者后,可以按如下方式发出请求:Spring中文文档

ViewBox viewBox = ... ;

Flux<AirportLocation> locations = requester.route("locate.radars.within") (1)
		.data(viewBox) (2)
		.retrieveFlux(AirportLocation.class); (3)
1 指定要包含在请求消息元数据中的路由。
2 为请求消息提供数据。
3 声明预期的响应。
val viewBox: ViewBox = ...

val locations = requester.route("locate.radars.within") (1)
		.data(viewBox) (2)
		.retrieveFlow<AirportLocation>() (3)
1 指定要包含在请求消息元数据中的路由。
2 为请求消息提供数据。
3 声明预期的响应。

交互类型由输入的基数隐式确定,并且 输出。上面的示例是一个 因为发送了一个值和一个流 的值。在大多数情况下,您不需要考虑这一点,只要 输入和输出的选择与 RSocket 交互类型以及输入和 响应者预期的输出。无效组合的唯一示例是多对一。Request-StreamSpring中文文档

该方法还接受任何反应式流,包括 和 ,以及在 中注册的任何其他值生产者。对于多值,例如产生 相同类型的值,请考虑使用重载方法之一以避免出现 对每个元素进行类型检查和查找:data(Object)PublisherFluxMonoReactiveAdapterRegistryPublisherFluxdataEncoderSpring中文文档

data(Object producer, Class<?> elementClass);
data(Object producer, ParameterizedTypeReference<?> elementTypeRef);

该步骤是可选的。对于不发送数据的请求,请跳过它:data(Object)Spring中文文档

Mono<AirportLocation> location = requester.route("find.radar.EWR"))
	.retrieveMono(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveAndAwait

val location = requester.route("find.radar.EWR")
	.retrieveAndAwait<AirportLocation>()

如果使用复合元数据(默认值),并且如果 值由已注册的 .例如:EncoderSpring中文文档

String securityToken = ... ;
ViewBox viewBox = ... ;
MimeType mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0");

Flux<AirportLocation> locations = requester.route("locate.radars.within")
		.metadata(securityToken, mimeType)
		.data(viewBox)
		.retrieveFlux(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveFlow

val requester: RSocketRequester = ...

val securityToken: String = ...
val viewBox: ViewBox = ...
val mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0")

val locations = requester.route("locate.radars.within")
		.metadata(securityToken, mimeType)
		.data(viewBox)
		.retrieveFlow<AirportLocation>()

对于使用返回 .请注意,仅表示消息已成功发送,而不表示已处理。Fire-and-Forgetsend()Mono<Void>MonoSpring中文文档

用于使用具有返回值的方法。Metadata-PushsendMetadata()Mono<Void>Spring中文文档

1 如果存在,请使用,以提高效率 路由匹配。PathPatternRouteMatcherspring-web
2 从具有 and/or 方法的类创建响应器。@MessageMapping@ConnectMapping
3 注册响应程序。
1 如果存在,请使用,以提高效率 路由匹配。PathPatternRouteMatcherspring-web
2 从具有 and/or 方法的类创建响应器。@MessageMapping@ConnectMapping
3 注册响应程序。
1 异步启动请求,独立于处理。
2 执行处理和返回完成。Mono<Void>
1 异步启动请求,独立于处理。
2 在挂起功能中执行处理。
1 指定要包含在请求消息元数据中的路由。
2 为请求消息提供数据。
3 声明预期的响应。
1 指定要包含在请求消息元数据中的路由。
2 为请求消息提供数据。
3 声明预期的响应。

带注释的响应者

RSocket 响应程序可以实现为 和 方法。 方法处理单个请求,而方法处理 连接级事件(设置和元数据推送)。支持带注释的响应程序 对称地,用于从服务器端响应和从客户端响应。@MessageMapping@ConnectMapping@MessageMapping@ConnectMappingSpring中文文档

服务器响应程序

要在服务器端使用带注释的响应器,请添加到您的 Spring 中 用于检测 Bean 的配置和方法:RSocketMessageHandler@Controller@MessageMapping@ConnectMappingSpring中文文档

@Configuration
static class ServerConfig {

	@Bean
	public RSocketMessageHandler rsocketMessageHandler() {
		RSocketMessageHandler handler = new RSocketMessageHandler();
		handler.routeMatcher(new PathPatternRouteMatcher());
		return handler;
	}
}
@Configuration
class ServerConfig {

	@Bean
	fun rsocketMessageHandler() = RSocketMessageHandler().apply {
		routeMatcher = PathPatternRouteMatcher()
	}
}

然后通过 Java RSocket API 启动 RSocket 服务器,并插入响应程序,如下所示:RSocketMessageHandlerSpring中文文档

ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);

CloseableChannel server =
	RSocketServer.create(handler.responder())
		.bind(TcpServerTransport.create("localhost", 7000))
		.block();
import org.springframework.beans.factory.getBean

val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()

val server = RSocketServer.create(handler.responder())
		.bind(TcpServerTransport.create("localhost", 7000))
		.awaitSingle()

RSocketMessageHandler默认情况下支持复合元数据和路由元数据。如果需要切换到 不同的 MIME 类型或寄存器其他元数据 MIME 类型。Spring中文文档

您需要设置元数据和数据所需的 和 实例 要支持的格式。您可能需要该模块进行编解码器实现。EncoderDecoderspring-webSpring中文文档

默认情况下,用于通过 匹配路由。 我们建议插入 from for 高效的路线匹配。RSocket 路由可以是分层的,但不是 URL 路径。 默认情况下,两个路由匹配器都配置为使用“.”作为分隔符,并且没有 URL 像使用 HTTP URL 一样解码。SimpleRouteMatcherAntPathMatcherPathPatternRouteMatcherspring-webSpring中文文档

RSocketMessageHandler可以通过以下方式进行配置,如果 您需要在同一进程中在客户端和服务器之间共享配置:RSocketStrategiesSpring中文文档

@Configuration
static class ServerConfig {

	@Bean
	public RSocketMessageHandler rsocketMessageHandler() {
		RSocketMessageHandler handler = new RSocketMessageHandler();
		handler.setRSocketStrategies(rsocketStrategies());
		return handler;
	}

	@Bean
	public RSocketStrategies rsocketStrategies() {
		return RSocketStrategies.builder()
			.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
			.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
			.routeMatcher(new PathPatternRouteMatcher())
			.build();
	}
}
@Configuration
class ServerConfig {

	@Bean
	fun rsocketMessageHandler() = RSocketMessageHandler().apply {
		rSocketStrategies = rsocketStrategies()
	}

	@Bean
	fun rsocketStrategies() = RSocketStrategies.builder()
			.encoders { it.add(Jackson2CborEncoder()) }
			.decoders { it.add(Jackson2CborDecoder()) }
			.routeMatcher(PathPatternRouteMatcher())
			.build()
}

客户端响应者

客户端的注释响应程序需要在 .有关详细信息,请参阅客户端响应程序RSocketRequester.BuilderSpring中文文档

@MessageMapping

服务器客户端响应程序配置到位后,可以使用如下方法:@MessageMappingSpring中文文档

@Controller
public class RadarsController {

	@MessageMapping("locate.radars.within")
	public Flux<AirportLocation> radars(MapRequest request) {
		// ...
	}
}
@Controller
class RadarsController {

	@MessageMapping("locate.radars.within")
	fun radars(request: MapRequest): Flow<AirportLocation> {
		// ...
	}
}

上述方法响应具有 路由“Locate.radars.within”。它支持灵活的方法签名,并可选择 使用以下方法参数:@MessageMappingSpring中文文档

Method 参数 描述

@PayloadSpring中文文档

请求的有效负载。这可以是异步类型的具体值,如 或 。MonoFluxSpring中文文档

注意:注释的使用是可选的。不是简单类型的方法参数 并且不是任何其他支持的参数,被假定为预期的有效负载。Spring中文文档

RSocketRequesterSpring中文文档

向远程端发出请求的请求者。Spring中文文档

@DestinationVariableSpring中文文档

根据映射模式中的变量从路由中提取的值,例如 .@MessageMapping("find.radar.{id}")Spring中文文档

@HeaderSpring中文文档

注册用于提取的元数据值,如 MetadataExtractor 中所述。Spring中文文档

@Headers Map<String, Object>Spring中文文档

注册用于提取的所有元数据值,如 MetadataExtractor 中所述。Spring中文文档

返回值应为一个或多个要序列化为响应的对象 负载。这可以是异步类型,如 或 、具体值或 或者是无值异步类型,例如 。MonoFluxvoidMono<Void>Spring中文文档

方法支持的 RSocket 交互类型由以下条件确定 输入的基数(即 参数)和输出,其中 基数表示以下内容:@MessageMapping@PayloadSpring中文文档

基数 描述

1Spring中文文档

显式值或单值异步类型,例如 。Mono<T>Spring中文文档

Spring中文文档

多值异步类型,例如 。Flux<T>Spring中文文档

0Spring中文文档

对于输入,这意味着该方法没有参数。@PayloadSpring中文文档

对于输出,这是 或无值异步类型,例如 。voidMono<Void>Spring中文文档

下表显示了所有输入和输出基数组合以及相应的 交互类型:Spring中文文档

输入基数 输出基数 交互类型

0, 1Spring中文文档

0Spring中文文档

即发即弃,请求-响应Spring中文文档

0, 1Spring中文文档

1Spring中文文档

请求-响应Spring中文文档

0, 1Spring中文文档

Spring中文文档

请求流Spring中文文档

Spring中文文档

0, 1, 许多Spring中文文档

请求通道Spring中文文档

@RSocketExchange

作为 的替代方法,您还可以使用方法处理请求。此类方法在 RSocket 接口上声明,可通过响应方用作请求方或由响应方实现。@MessageMapping@RSocketExchangeRSocketServiceProxyFactorySpring中文文档

例如,要以响应方身份处理请求,请执行以下操作:Spring中文文档

public interface RadarsService {

	@RSocketExchange("locate.radars.within")
	Flux<AirportLocation> radars(MapRequest request);
}

@Controller
public class RadarsController implements RadarsService {

	public Flux<AirportLocation> radars(MapRequest request) {
		// ...
	}
}
interface RadarsService {

	@RSocketExchange("locate.radars.within")
	fun radars(request: MapRequest): Flow<AirportLocation>
}

@Controller
class RadarsController : RadarsService {

	override fun radars(request: MapRequest): Flow<AirportLocation> {
		// ...
	}
}

和 自 前者需要保持适合请求者和响应者使用。例如,虽然可以声明处理任意数量的路由,并且每个路由可以 是一种模式,必须用单一的、具体的路线来声明。有 与元数据相关的支持方法参数也存在细微差异,有关支持的参数列表,请参阅 @MessageMappingRSocket 接口@RSocketExhange@MessageMapping@MessageMapping@RSocketExchangeSpring中文文档

@RSocketExchange可以在类型级别用于指定所有路由的通用前缀 对于给定的 RSocket 服务接口。Spring中文文档

@ConnectMapping

@ConnectMapping在 RSocket 连接开始时处理帧,以及 通过框架的任何后续元数据推送通知,即 在。SETUPMETADATA_PUSHmetadataPush(Payload)io.rsocket.RSocketSpring中文文档

@ConnectMapping方法支持与 @MessageMapping 相同的参数,但基于元数据和来自 AND 帧的数据。 可以有一个模式来缩小处理范围 在元数据中具有路由的特定连接,或者未声明任何模式 然后所有连接都匹配。SETUPMETADATA_PUSH@ConnectMappingSpring中文文档

@ConnectMapping方法不能返回数据,必须使用 OR 作为返回值声明。如果处理返回新的错误 连接,则连接被拒绝。处理不得拖延 请求连接。有关详细信息,请参阅服务器请求程序voidMono<Void>RSocketRequesterSpring中文文档

Method 参数 描述

@PayloadSpring中文文档

请求的有效负载。这可以是异步类型的具体值,如 或 。MonoFluxSpring中文文档

注意:注释的使用是可选的。不是简单类型的方法参数 并且不是任何其他支持的参数,被假定为预期的有效负载。Spring中文文档

RSocketRequesterSpring中文文档

向远程端发出请求的请求者。Spring中文文档

@DestinationVariableSpring中文文档

根据映射模式中的变量从路由中提取的值,例如 .@MessageMapping("find.radar.{id}")Spring中文文档

@HeaderSpring中文文档

注册用于提取的元数据值,如 MetadataExtractor 中所述。Spring中文文档

@Headers Map<String, Object>Spring中文文档

注册用于提取的所有元数据值,如 MetadataExtractor 中所述。Spring中文文档

基数 描述

1Spring中文文档

显式值或单值异步类型,例如 。Mono<T>Spring中文文档

Spring中文文档

多值异步类型,例如 。Flux<T>Spring中文文档

0Spring中文文档

对于输入,这意味着该方法没有参数。@PayloadSpring中文文档

对于输出,这是 或无值异步类型,例如 。voidMono<Void>Spring中文文档

输入基数 输出基数 交互类型

0, 1Spring中文文档

0Spring中文文档

即发即弃,请求-响应Spring中文文档

0, 1Spring中文文档

1Spring中文文档

请求-响应Spring中文文档

0, 1Spring中文文档

Spring中文文档

请求流Spring中文文档

Spring中文文档

0, 1, 许多Spring中文文档

请求通道Spring中文文档

MetadataExtractor

响应者必须解释元数据。复合元数据允许独立 格式化的元数据值(例如,用于路由、安全性、跟踪),每个值都有自己的 MIME 类型。应用程序需要一种方法来配置要支持的元数据 MIME 类型,以及一种方法 访问提取的值。Spring中文文档

MetadataExtractor是获取序列化元数据并返回解码的协定 然后可以按名称访问的名称-值对,例如通过带注释的处理程序方法中的标头。@HeaderSpring中文文档

DefaultMetadataExtractor可以给出实例来解码元数据。出 该框内置了对“message/x.rsocket.routing.v0”的支持,它解码并保存在“route”键下。对于您需要提供的任何其他 MIME 类型 a 并注册 MIME 类型,如下所示:DecoderStringDecoderSpring中文文档

DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(fooMimeType, Foo.class, "foo");
import org.springframework.messaging.rsocket.metadataToExtract

val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Foo>(fooMimeType, "foo")

复合元数据非常适合组合独立的元数据值。然而, 请求者可能不支持复合元数据,或者可能选择不使用它。为此,可能需要自定义逻辑将解码值映射到输出 地图。下面是将 JSON 用于元数据的示例:DefaultMetadataExtractorSpring中文文档

DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(
	MimeType.valueOf("application/vnd.myapp.metadata+json"),
	new ParameterizedTypeReference<Map<String,String>>() {},
	(jsonMap, outputMap) -> {
		outputMap.putAll(jsonMap);
	});
import org.springframework.messaging.rsocket.metadataToExtract

val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Map<String, String>>(MimeType.valueOf("application/vnd.myapp.metadata+json")) { jsonMap, outputMap ->
	outputMap.putAll(jsonMap)
}

配置时,您可以让使用配置的解码器创建提取器,并且 只需使用回调即可自定义注册,如下所示:MetadataExtractorRSocketStrategiesRSocketStrategies.BuilderSpring中文文档

RSocketStrategies strategies = RSocketStrategies.builder()
	.metadataExtractorRegistry(registry -> {
		registry.metadataToExtract(fooMimeType, Foo.class, "foo");
		// ...
	})
	.build();
import org.springframework.messaging.rsocket.metadataToExtract

val strategies = RSocketStrategies.builder()
		.metadataExtractorRegistry { registry: MetadataExtractorRegistry ->
			registry.metadataToExtract<Foo>(fooMimeType, "foo")
			// ...
		}
		.build()

RSocket 接口

Spring Framework 允许您将 RSocket 服务定义为带有方法的 Java 接口。可以将此类接口传递给 以创建通过 RSocketRequester 执行请求的代理。您还可以实现 接口作为处理请求的响应器。@RSocketExchangeRSocketServiceProxyFactorySpring中文文档

首先使用以下方法创建接口:@RSocketExchangeSpring中文文档

interface RadarService {

	@RSocketExchange("radars")
	Flux<AirportLocation> getRadars(@Payload MapRequest request);

	// more RSocket exchange methods...

}

现在,您可以创建一个代理,该代理在调用方法时执行请求:Spring中文文档

RSocketRequester requester = ... ;
RSocketServiceProxyFactory factory = RSocketServiceProxyFactory.builder(requester).build();

RadarService service = factory.createClient(RadarService.class);

您还可以实现该接口以作为响应方处理请求。 请参阅带批注的响应程序Spring中文文档

方法参数

带注释的 RSocket 交换方法支持具有以下功能的灵活方法签名 方法参数:Spring中文文档

方法参数 描述

@DestinationVariableSpring中文文档

添加要与注释中的路由一起传递的路由变量,以展开路由中的模板占位符。 此变量可以是 String 或任何 Object,然后通过 进行格式化。RSocketRequester@RSocketExchangetoString()Spring中文文档

@PayloadSpring中文文档

设置请求的输入有效负载。这可以是一个具体的价值,也可以是任何生产者 的值,可以通过以下方式适应反应式流PublisherReactiveAdapterRegistrySpring中文文档

Object,如果后跟MimeTypeSpring中文文档

输入有效负载中元数据条目的值。这可以是任何 因为下一个参数是元数据条目。该值可以是具体的 value 或任何可通过 .ObjectMimeTypePublisherReactiveAdapterRegistrySpring中文文档

MimeTypeSpring中文文档

for 元数据条目。前面的方法参数应为 元数据值。MimeTypeSpring中文文档

返回值

带批注的 RSocket 交换方法支持具体值的返回值,或者 任何可以通过 .PublisherReactiveAdapterRegistrySpring中文文档

默认情况下,RSocket 服务方法与同步(阻塞)方法的行为 签名取决于基础 RSocket 的响应超时设置以及 RSocket 保持活动状态设置。 确实公开了一个选项,该选项还允许您配置阻止响应的最长时间, 但我们建议在 RSocket 级别配置超时值,以便进行更多控制。ClientTransportRSocketServiceProxyFactory.BuilderblockTimeoutSpring中文文档

方法参数 描述

@DestinationVariableSpring中文文档

添加要与注释中的路由一起传递的路由变量,以展开路由中的模板占位符。 此变量可以是 String 或任何 Object,然后通过 进行格式化。RSocketRequester@RSocketExchangetoString()Spring中文文档

@PayloadSpring中文文档

设置请求的输入有效负载。这可以是一个具体的价值,也可以是任何生产者 的值,可以通过以下方式适应反应式流PublisherReactiveAdapterRegistrySpring中文文档

Object,如果后跟MimeTypeSpring中文文档

输入有效负载中元数据条目的值。这可以是任何 因为下一个参数是元数据条目。该值可以是具体的 value 或任何可通过 .ObjectMimeTypePublisherReactiveAdapterRegistrySpring中文文档

MimeTypeSpring中文文档

for 元数据条目。前面的方法参数应为 元数据值。MimeTypeSpring中文文档