参考文档的这一部分介绍了对 reactive-stack WebSocket 的支持 消息。

WebSocket简介

WebSocket 协议 RFC 6455 提供了一个标准化的 在客户端和服务器之间建立全双工、双向通信通道的方法 通过单个 TCP 连接。它是一种与 HTTP 不同的 TCP 协议,但旨在 通过 HTTP 工作,使用端口 80 和 443,并允许重用现有防火墙规则。

WebSocket 交互从使用 HTTP 标头的 HTTP 请求开始 升级,或者在本例中切换到 WebSocket 协议。以下示例 显示这样的交互:Upgrade

GET /spring-websocket-portfolio/portfolio HTTP/1.1
Host: localhost:8080
Upgrade: websocket (1)
Connection: Upgrade (2)
Sec-WebSocket-Key: Uc9l9TMkWGbHFD2qnFHltg==
Sec-WebSocket-Protocol: v10.stomp, v11.stomp
Sec-WebSocket-Version: 13
Origin: http://localhost:8080
1 标头。Upgrade
2 使用连接。Upgrade

支持 WebSocket 的服务器返回输出,而不是通常的 200 状态代码 类似于以下内容:

HTTP/1.1 101 Switching Protocols (1)
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: 1qVdfYHU9hPOl4JYYNXF623Gzn0=
Sec-WebSocket-Protocol: v10.stomp
1 协议交换机

握手成功后,HTTP 升级请求的基础 TCP 套接字将保留 为客户端和服务器打开以继续发送和接收消息。

WebSocket 工作原理的完整介绍超出了本文档的范围。 请参阅 RFC 6455、HTML5 的 WebSocket 章节或许多介绍和 Web 上的教程。

请注意,如果 WebSocket 服务器在 Web 服务器(例如 nginx)后面运行,则 可能需要将其配置为将 WebSocket 升级请求传递到 WebSocket 服务器。同样,如果应用程序在云环境中运行,请选中 云提供商与 WebSocket 支持相关的说明。

HTTP 与 WebSocket

即使 WebSocket 被设计为与 HTTP 兼容并且以 HTTP 请求开头, 重要的是要了解这两种协议导致非常不同 体系结构和应用程序编程模型。

在 HTTP 和 REST 中,应用程序被建模为多个 URL。要与应用程序交互, 客户端访问这些 URL,请求-响应样式。服务器将请求路由 基于 HTTP URL、方法和标头的适当处理程序。

相比之下,在 WebSocket 中,初始连接通常只有一个 URL。 随后,所有应用程序消息都在同一 TCP 连接上流动。这指向 一个完全不同的异步、事件驱动的消息传递体系结构。

WebSocket 也是一种低级传输协议,与 HTTP 不同,它没有规定 消息内容的任何语义。这意味着没有办法路由或处理 除非客户端和服务器在消息语义上达成一致,否则消息。

WebSocket 客户端和服务器可以协商使用更高级别的消息传递协议 (例如,STOMP),通过 HTTP 握手请求上的标头。 如果没有这一点,他们需要提出自己的惯例。Sec-WebSocket-Protocol

何时使用 WebSocket

WebSocket 可以使网页具有动态性和交互性。但是,在许多情况下, AJAX 和 HTTP 流式处理或长轮询的组合可以提供简单且 有效的解决方案。

例如,新闻、邮件和社交源需要动态更新,但可能需要动态更新 每隔几分钟就这样做是完全可以的。协作、游戏和金融应用,在 另一方面,需要更接近实时。

延迟本身并不是决定性因素。如果消息量相对较低(例如, 监控网络故障)HTTP 流式处理或轮询可以提供有效的解决方案。 低延迟、高频率和高音量的结合才能发挥最佳效果 使用 WebSocket 的案例。

还要记住,在互联网上,您无法控制的限制性代理 可能会阻止 WebSocket 交互,因为它们未配置为传递标头,或者因为它们关闭了看似空闲的长期连接。这 意味着将 WebSocket 用于防火墙内的内部应用程序是一个 与面向公众的应用程序相比,它更直接。Upgrade

WebSocket 接口

Spring Framework 提供了一个 WebSocket API,您可以使用它来编写客户端和 处理 WebSocket 消息的服务器端应用程序。

服务器

若要创建 WebSocket 服务器,可以先创建一个 . 以下示例演示如何执行此操作:WebSocketHandler

  • Java

  • Kotlin

import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;

public class MyWebSocketHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {
		// ...
	}
}
import org.springframework.web.reactive.socket.WebSocketHandler
import org.springframework.web.reactive.socket.WebSocketSession

class MyWebSocketHandler : WebSocketHandler {

	override fun handle(session: WebSocketSession): Mono<Void> {
		// ...
	}
}

然后,您可以将其映射到 URL:

  • Java

  • Kotlin

@Configuration
class WebConfig {

	@Bean
	public HandlerMapping handlerMapping() {
		Map<String, WebSocketHandler> map = new HashMap<>();
		map.put("/path", new MyWebSocketHandler());
		int order = -1; // before annotated controllers

		return new SimpleUrlHandlerMapping(map, order);
	}
}
@Configuration
class WebConfig {

	@Bean
	fun handlerMapping(): HandlerMapping {
		val map = mapOf("/path" to MyWebSocketHandler())
		val order = -1 // before annotated controllers

		return SimpleUrlHandlerMapping(map, order)
	}
}

如果使用 WebFlux Config,则什么都没有 如果不使用 WebFlux 配置,则需要声明如下所示:WebSocketHandlerAdapter

  • Java

  • Kotlin

@Configuration
class WebConfig {

	// ...

	@Bean
	public WebSocketHandlerAdapter handlerAdapter() {
		return new WebSocketHandlerAdapter();
	}
}
@Configuration
class WebConfig {

	// ...

	@Bean
	fun handlerAdapter() =  WebSocketHandlerAdapter()
}

WebSocketHandler

用于指示会话的应用程序处理何时完成的 take 和 return 方法。会话已处理 通过两个流,一个用于入站消息,一个用于出站消息。下表 介绍处理流的两种方法:handleWebSocketHandlerWebSocketSessionMono<Void>

WebSocketSession方法 描述

Flux<WebSocketMessage> receive()

提供对入站消息流的访问,并在连接关闭时完成。

Mono<Void> send(Publisher<WebSocketMessage>)

获取传出消息的源,写入消息,并返回 在源代码完成并完成写入时完成。Mono<Void>

A 必须将入站和出站流组合成一个统一的流,并且 返回反映该流完成的 A。取决于应用 要求,统一流在以下情况下完成:WebSocketHandlerMono<Void>

  • 入站或出站消息流完成。

  • 入站流完成(即连接关闭),而出站流是无限的。

  • 在选定的点上,通过 .closeWebSocketSession

当入站和出站消息流组合在一起时,无需 检查连接是否打开,因为 Reactive Streams 会发出结束活动的信号。 入站流接收完成或错误信号,出站流接收到完成或错误信号 接收取消信号。

处理程序的最基本实现是处理入站流的实现。这 以下示例演示了这样的实现:

  • Java

  • Kotlin

class ExampleHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {
		return session.receive()			(1)
				.doOnNext(message -> {
					// ...					(2)
				})
				.concatMap(message -> {
					// ...					(3)
				})
				.then();					(4)
	}
}
1 访问入站消息流。
2 对每条消息执行一些操作。
3 执行使用消息内容的嵌套异步操作。
4 返回 a 在接收完成时完成。Mono<Void>
class ExampleHandler : WebSocketHandler {

	override fun handle(session: WebSocketSession): Mono<Void> {
		return session.receive()            (1)
				.doOnNext {
					// ...					(2)
				}
				.concatMap {
					// ...					(3)
				}
				.then()                     (4)
	}
}
1 访问入站消息流。
2 对每条消息执行一些操作。
3 执行使用消息内容的嵌套异步操作。
4 返回 a 在接收完成时完成。Mono<Void>
对于嵌套的异步操作,可能需要调用基础 使用池数据缓冲区的服务器(例如,Netty)。否则,数据缓冲区可能是 在您有机会阅读数据之前发布。有关更多背景信息,请参阅数据缓冲区和编解码器message.retain()

以下实现将入站流和出站流组合在一起:

  • Java

  • Kotlin

class ExampleHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {

		Flux<WebSocketMessage> output = session.receive()				(1)
				.doOnNext(message -> {
					// ...
				})
				.concatMap(message -> {
					// ...
				})
				.map(value -> session.textMessage("Echo " + value));	(2)

		return session.send(output);									(3)
	}
}
1 处理入站消息流。
2 创建出站消息,生成组合流。
3 返回未完成的 a,而我们继续接收。Mono<Void>
class ExampleHandler : WebSocketHandler {

	override fun handle(session: WebSocketSession): Mono<Void> {

		val output = session.receive()                      (1)
				.doOnNext {
					// ...
				}
				.concatMap {
					// ...
				}
				.map { session.textMessage("Echo $it") }    (2)

		return session.send(output)                         (3)
	}
}
1 处理入站消息流。
2 创建出站消息,生成组合流。
3 返回未完成的 a,而我们继续接收。Mono<Void>

入站和出站流可以是独立的,并且只有在完成时才能加入, 如以下示例所示:

  • Java

  • Kotlin

class ExampleHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {

		Mono<Void> input = session.receive()								(1)
				.doOnNext(message -> {
					// ...
				})
				.concatMap(message -> {
					// ...
				})
				.then();

		Flux<String> source = ... ;
		Mono<Void> output = session.send(source.map(session::textMessage));	(2)

		return Mono.zip(input, output).then();								(3)
	}
}
1 处理入站消息流。
2 发送传出消息。
3 联接流并返回一个在任一流结束时完成的 a。Mono<Void>
class ExampleHandler : WebSocketHandler {

	override fun handle(session: WebSocketSession): Mono<Void> {

		val input = session.receive()									(1)
				.doOnNext {
					// ...
				}
				.concatMap {
					// ...
				}
				.then()

		val source: Flux<String> = ...
		val output = session.send(source.map(session::textMessage))		(2)

		return Mono.zip(input, output).then()							(3)
	}
}
1 处理入站消息流。
2 发送传出消息。
3 联接流并返回一个在任一流结束时完成的 a。Mono<Void>

DataBuffer

DataBuffer是 WebFlux 中字节缓冲区的表示形式。Spring Core 部分 参考资料在数据缓冲区和编解码器部分有更多内容。要了解的关键点是,在某些 像 Netty 这样的服务器,字节缓冲区是池化的,引用计数,必须释放 使用时以避免内存泄漏。

在 Netty 上运行时,应用程序必须使用 希望保留输入数据缓冲区以确保它们不会被释放,并且 随后在缓冲区用完时使用。DataBufferUtils.retain(dataBuffer)DataBufferUtils.release(dataBuffer)

握手

WebSocketHandlerAdapter委托给 .默认情况下,这是一个实例 的,它对 WebSocket 请求和 然后用于正在使用的服务器。目前,有内置的 支持 Reactor Netty、Tomcat、Jetty 和 Undertow。WebSocketServiceHandshakeWebSocketServiceRequestUpgradeStrategy

HandshakeWebSocketService公开一个属性,该属性允许 设置 A 从中提取属性并插入它们 转换为 .sessionAttributePredicatePredicate<String>WebSessionWebSocketSession

服务器配置

对于每个服务器,都公开了特定于 底层 WebSocket 服务器引擎。使用 WebFlux Java 配置时,您可以自定义 WebFlux Config 的相应部分中显示的此类属性,或者如果 不使用 WebFlux 配置,请使用以下命令:RequestUpgradeStrategy

  • Java

  • Kotlin

@Configuration
class WebConfig {

	@Bean
	public WebSocketHandlerAdapter handlerAdapter() {
		return new WebSocketHandlerAdapter(webSocketService());
	}

	@Bean
	public WebSocketService webSocketService() {
		TomcatRequestUpgradeStrategy strategy = new TomcatRequestUpgradeStrategy();
		strategy.setMaxSessionIdleTimeout(0L);
		return new HandshakeWebSocketService(strategy);
	}
}
@Configuration
class WebConfig {

	@Bean
	fun handlerAdapter() =
			WebSocketHandlerAdapter(webSocketService())

	@Bean
	fun webSocketService(): WebSocketService {
		val strategy = TomcatRequestUpgradeStrategy().apply {
			setMaxSessionIdleTimeout(0L)
		}
		return HandshakeWebSocketService(strategy)
	}
}

检查服务器的升级策略,了解有哪些选项可用。现在 只有 Tomcat 和 Jetty 公开了此类选项。

CORS(核心斯酒店)

配置 CORS 并限制对 WebSocket 终结点的访问的最简单方法是 实现并返回包含允许的源、标头和其他详细信息的 A。如果你做不到 那,您还可以将属性设置为 按 URL 模式指定 CORS 设置。如果同时指定了两者,则使用 on 上的方法将它们组合在一起。WebSocketHandlerCorsConfigurationSourceCorsConfigurationcorsConfigurationsSimpleUrlHandlercombineCorsConfiguration

客户

Spring WebFlux 提供了一个抽象,其中包含以下实现 Reactor Netty、Tomcat、Jetty、Undertow 和标准 Java(即 JSR-356)。WebSocketClient

Tomcat 客户端实际上是标准 Java 客户端的扩展,但有一些额外的扩展 处理中的功能,以利用 Tomcat 特定的功能 用于暂停接收背压消息的 API。WebSocketSession

若要启动 WebSocket 会话,可以创建客户端实例并使用其方法:execute

  • Java

  • Kotlin

WebSocketClient client = new ReactorNettyWebSocketClient();

URI url = new URI("ws://localhost:8080/path");
client.execute(url, session ->
		session.receive()
				.doOnNext(System.out::println)
				.then());
val client = ReactorNettyWebSocketClient()

		val url = URI("ws://localhost:8080/path")
		client.execute(url) { session ->
			session.receive()
					.doOnNext(::println)
			.then()
		}

某些客户端(如 Jetty)实施并需要停止和启动 在您可以使用它们之前。所有客户端都有与配置相关的构造函数选项 的基础 WebSocket 客户端。Lifecycle