参考文档的这一部分涵盖了对反应式堆栈 WebSocket 的支持 消息。spring-doc.cn

WebSocket 简介

WebSocket 协议 RFC 6455 提供了标准化的 在 Client 端和 Server 之间建立全双工双向通信通道的方法 通过单个 TCP 连接。它是与 HTTP 不同的 TCP 协议,但旨在 通过 HTTP 工作,使用端口 80 和 443,并允许重复使用现有防火墙规则。spring-doc.cn

WebSocket 交互以使用 HTTP 标头的 HTTP 请求开始 进行升级,或者在本例中切换到 WebSocket 协议。以下示例 显示了这样的交互:Upgradespring-doc.cn

GET /spring-websocket-portfolio/portfolio HTTP/1.1
Host: localhost:8080
Upgrade: websocket (1)
Connection: Upgrade (2)
Sec-WebSocket-Key: Uc9l9TMkWGbHFD2qnFHltg==
Sec-WebSocket-Protocol: v10.stomp, v11.stomp
Sec-WebSocket-Version: 13
Origin: http://localhost:8080
1 标头。Upgrade
2 使用连接。Upgrade

支持 WebSocket 的服务器返回输出,而不是通常的 200 状态代码 类似于以下内容:spring-doc.cn

HTTP/1.1 101 Switching Protocols (1)
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: 1qVdfYHU9hPOl4JYYNXF623Gzn0=
Sec-WebSocket-Protocol: v10.stomp
1 协议切换

握手成功后,HTTP 升级请求的基础 TCP 套接字将保留 open 以继续发送和接收消息。spring-doc.cn

有关 WebSockets 工作原理的完整介绍超出了本文档的范围。 请参阅 RFC 6455、HTML5 的 WebSocket 章节或许多介绍中的任何一个 Web 上的教程。spring-doc.cn

请注意,如果 WebSocket 服务器在 Web 服务器(例如 nginx)后面运行,则 可能需要对其进行配置,以便将 WebSocket 升级请求传递给 WebSocket 服务器。同样,如果应用程序在云环境中运行,请检查 云提供商与 WebSocket 支持相关的说明。spring-doc.cn

HTTP 与 WebSocket

尽管 WebSocket 设计为与 HTTP 兼容并以 HTTP 请求开头, 重要的是要了解这两种协议导致非常不同的结果 体系结构和应用程序编程模型。spring-doc.cn

在 HTTP 和 REST 中,应用程序被建模为多个 URL。要与应用程序交互, 客户端以请求-响应样式访问这些 URL。服务器将请求路由到 基于 HTTP URL、方法和标头的适当处理程序。spring-doc.cn

相比之下,在 WebSockets 中,初始连接通常只有一个 URL。 随后,所有应用程序消息都在同一 TCP 连接上流动。这指向 一种完全不同的异步、事件驱动的消息传递架构。spring-doc.cn

WebSocket 也是一种低级传输协议,与 HTTP 不同,它没有规定 消息内容的任何语义。这意味着无法路由或处理 一条消息,除非客户端和服务器在消息语义上达成一致。spring-doc.cn

WebSocket 客户端和服务器可以协商使用更高级别的消息传递协议 (例如,STOMP),通过 HTTP 握手请求上的标头。 如果没有这些,他们需要提出自己的惯例。Sec-WebSocket-Protocolspring-doc.cn

何时使用 WebSockets

WebSockets 可以使网页具有动态和交互性。但是,在许多情况下, AJAX 和 HTTP 流式处理或长轮询的组合可以提供简单且 有效的解决方案。spring-doc.cn

例如,新闻、邮件和社交源需要动态更新,但可能需要 每隔几分钟这样做一次完全可以。协作、游戏和金融应用程序 另一方面,需要更接近实时。spring-doc.cn

延迟本身并不是决定因素。如果消息量相对较低(例如, 监控网络故障)HTTP 流或轮询可以提供有效的解决方案。 低延迟、高频率和高容量的组合造就了最好的 case 来使用 WebSocket。spring-doc.cn

另请记住,在 Internet 上,您无法控制的限制性代理 可能会排除 WebSocket 交互,因为它们未配置为传递 Headers,或者因为它们关闭了看起来空闲的长期连接。这 意味着将 WebSocket 用于防火墙内的内部应用程序是 比面向公众的应用程序更直接的决定。Upgradespring-doc.cn

1 标头。Upgrade
2 使用连接。Upgrade
1 协议切换

WebSocket API

Spring Framework 提供了一个 WebSocket API,您可以使用它来编写 client- 和 处理 WebSocket 消息的服务器端应用程序。spring-doc.cn

服务器

要创建 WebSocket 服务器,您可以先创建一个 . 以下示例显示了如何执行此操作:WebSocketHandlerspring-doc.cn

import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;

public class MyWebSocketHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {
		// ...
	}
}
import org.springframework.web.reactive.socket.WebSocketHandler
import org.springframework.web.reactive.socket.WebSocketSession

class MyWebSocketHandler : WebSocketHandler {

	override fun handle(session: WebSocketSession): Mono<Void> {
		// ...
	}
}

然后,您可以将其映射到 URL:spring-doc.cn

@Configuration
class WebConfig {

	@Bean
	public HandlerMapping handlerMapping() {
		Map<String, WebSocketHandler> map = new HashMap<>();
		map.put("/path", new MyWebSocketHandler());
		int order = -1; // before annotated controllers

		return new SimpleUrlHandlerMapping(map, order);
	}
}
@Configuration
class WebConfig {

	@Bean
	fun handlerMapping(): HandlerMapping {
		val map = mapOf("/path" to MyWebSocketHandler())
		val order = -1 // before annotated controllers

		return SimpleUrlHandlerMapping(map, order)
	}
}

如果使用 WebFlux 配置,则没有任何内容 进一步操作,或者如果不使用 WebFlux 配置,则需要声明 a,如下所示:WebSocketHandlerAdapterspring-doc.cn

@Configuration
class WebConfig {

	// ...

	@Bean
	public WebSocketHandlerAdapter handlerAdapter() {
		return new WebSocketHandlerAdapter();
	}
}
@Configuration
class WebConfig {

	// ...

	@Bean
	fun handlerAdapter() =  WebSocketHandlerAdapter()
}

WebSocketHandler

方法 takes 和 returns 指示应用程序对会话的处理何时完成。会话已处理 通过两个流,一个用于入站消息,一个用于出站消息。下表 介绍处理流的两种方法:handleWebSocketHandlerWebSocketSessionMono<Void>spring-doc.cn

WebSocketSession方法 描述

Flux<WebSocketMessage> receive()spring-doc.cn

提供对入站消息流的访问,并在连接关闭时完成。spring-doc.cn

Mono<Void> send(Publisher<WebSocketMessage>)spring-doc.cn

获取传出消息的源,写入消息,并返回一个 在源完成和写入完成时完成。Mono<Void>spring-doc.cn

A 必须将入站和出站流组合成一个统一的流,并且 返回反映该流完成情况的 A。取决于应用 要求,则统一流将在以下情况下完成:WebSocketHandlerMono<Void>spring-doc.cn

  • 入站或出站消息流完成。spring-doc.cn

  • 入站流完成(即连接关闭),而出站流是无限的。spring-doc.cn

  • 在选定的点,通过 的方法。closeWebSocketSessionspring-doc.cn

当入站和出站消息流组合在一起时,无需 检查连接是否打开,因为 Reactive Streams 向 end activity 发出信号。 入站流接收到完成或错误信号,出站流 接收取消信号。spring-doc.cn

处理程序的最基本实现是处理入站流的处理程序。这 以下示例显示了此类实现:spring-doc.cn

class ExampleHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {
		return session.receive()			(1)
				.doOnNext(message -> {
					// ...					(2)
				})
				.concatMap(message -> {
					// ...					(3)
				})
				.then();					(4)
	}
}
1 访问入站消息流。
2 对每条消息执行一些操作。
3 执行使用消息内容的嵌套异步操作。
4 返回 a ,该 a 在接收完成时完成。Mono<Void>
class ExampleHandler : WebSocketHandler {

	override fun handle(session: WebSocketSession): Mono<Void> {
		return session.receive()            (1)
				.doOnNext {
					// ...					(2)
				}
				.concatMap {
					// ...					(3)
				}
				.then()                     (4)
	}
}
1 访问入站消息流。
2 对每条消息执行一些操作。
3 执行使用消息内容的嵌套异步操作。
4 返回 a ,该 a 在接收完成时完成。Mono<Void>
对于嵌套的异步操作,您可能需要调用底层 使用池化数据缓冲区的服务器(例如 Netty)。否则,数据缓冲区可能是 在您有机会读取数据之前释放。有关更多背景信息,请参阅 数据缓冲区和编解码器message.retain()

以下实现结合了入站流和出站流:spring-doc.cn

class ExampleHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {

		Flux<WebSocketMessage> output = session.receive()				(1)
				.doOnNext(message -> {
					// ...
				})
				.concatMap(message -> {
					// ...
				})
				.map(value -> session.textMessage("Echo " + value));	(2)

		return session.send(output);									(3)
	}
}
1 处理入站消息流。
2 创建出站消息,生成组合流。
3 返回未完成的 a,而我们继续接收。Mono<Void>
class ExampleHandler : WebSocketHandler {

	override fun handle(session: WebSocketSession): Mono<Void> {

		val output = session.receive()                      (1)
				.doOnNext {
					// ...
				}
				.concatMap {
					// ...
				}
				.map { session.textMessage("Echo $it") }    (2)

		return session.send(output)                         (3)
	}
}
1 处理入站消息流。
2 创建出站消息,生成组合流。
3 返回未完成的 a,而我们继续接收。Mono<Void>

入站流和出站流可以是独立的,并且仅在完成时加入, 如下例所示:spring-doc.cn

class ExampleHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {

		Mono<Void> input = session.receive()								(1)
				.doOnNext(message -> {
					// ...
				})
				.concatMap(message -> {
					// ...
				})
				.then();

		Flux<String> source = ... ;
		Mono<Void> output = session.send(source.map(session::textMessage));	(2)

		return Mono.zip(input, output).then();								(3)
	}
}
1 处理入站消息流。
2 发送传出消息。
3 加入流并返回 a ,该流在任一流结束时完成。Mono<Void>
class ExampleHandler : WebSocketHandler {

	override fun handle(session: WebSocketSession): Mono<Void> {

		val input = session.receive()									(1)
				.doOnNext {
					// ...
				}
				.concatMap {
					// ...
				}
				.then()

		val source: Flux<String> = ...
		val output = session.send(source.map(session::textMessage))		(2)

		return Mono.zip(input, output).then()							(3)
	}
}
1 处理入站消息流。
2 发送传出消息。
3 加入流并返回 a ,该流在任一流结束时完成。Mono<Void>

DataBuffer

DataBuffer是 WebFlux 中字节缓冲区的表示形式。Spring Core 的 该参考在 Data Buffers and Codecs 一节中有更多关于这方面的内容。要理解的关键点是,在某些 像 Netty 这样的服务器,字节缓冲区是池化的和引用计数的,并且必须释放 当使用以避免内存泄漏时。spring-doc.cn

在 Netty 上运行时,应用程序必须使用 if 希望保留 input 数据缓冲区以确保它们不会被释放,并且 随后在消耗缓冲区时使用。DataBufferUtils.retain(dataBuffer)DataBufferUtils.release(dataBuffer)spring-doc.cn

握手

WebSocketHandlerAdapter委托给 .默认情况下,这是一个实例 of 执行基本检查,它对 WebSocket 请求执行基本检查,而 然后用于正在使用的服务器。目前,有内置的 支持 Reactor Netty、Tomcat、Jetty 和 Undertow。WebSocketServiceHandshakeWebSocketServiceRequestUpgradeStrategyspring-doc.cn

HandshakeWebSocketService公开一个属性,该属性允许 设置 a 以从 中提取属性并插入它们 转换为 的属性中。sessionAttributePredicatePredicate<String>WebSessionWebSocketSessionspring-doc.cn

服务器配置

的 for each server 公开特定于 底层 WebSocket 服务器引擎。使用 WebFlux Java 配置时,您可以自定义 如 WebFlux Config 的相应部分所示的属性,或者如果 不使用 WebFlux 配置,请使用以下命令:RequestUpgradeStrategyspring-doc.cn

@Configuration
class WebConfig {

	@Bean
	public WebSocketHandlerAdapter handlerAdapter() {
		return new WebSocketHandlerAdapter(webSocketService());
	}

	@Bean
	public WebSocketService webSocketService() {
		TomcatRequestUpgradeStrategy strategy = new TomcatRequestUpgradeStrategy();
		strategy.setMaxSessionIdleTimeout(0L);
		return new HandshakeWebSocketService(strategy);
	}
}
@Configuration
class WebConfig {

	@Bean
	fun handlerAdapter() =
			WebSocketHandlerAdapter(webSocketService())

	@Bean
	fun webSocketService(): WebSocketService {
		val strategy = TomcatRequestUpgradeStrategy().apply {
			setMaxSessionIdleTimeout(0L)
		}
		return HandshakeWebSocketService(strategy)
	}
}

检查服务器的升级策略以查看可用的选项。现在 只有 Tomcat 和 Jetty 公开了此类选项。spring-doc.cn

CORS

配置 CORS 并限制对 WebSocket 终端节点的访问的最简单方法是 实现并返回包含允许的来源、标头和其他详细信息的 a。如果你做不到 ,您还可以将 通过 URL 模式指定 CORS 设置。如果同时指定了这两者,则使用 on 上的方法将它们组合在一起。WebSocketHandlerCorsConfigurationSourceCorsConfigurationcorsConfigurationsSimpleUrlHandlercombineCorsConfigurationspring-doc.cn

客户端

Spring WebFlux 提供了一个抽象,其中包含 Reactor Netty、Tomcat、Jetty、Undertow 和标准 Java(即 JSR-356)。WebSocketClientspring-doc.cn

Tomcat 客户端实际上是标准 Java 客户端的扩展,带有一些额外的 功能,以利用特定于 Tomcat 的 用于暂停接收背压消息的 API。WebSocketSession

要启动 WebSocket 会话,您可以创建客户端的实例并使用其方法:executespring-doc.cn

WebSocketClient client = new ReactorNettyWebSocketClient();

URI url = new URI("ws://localhost:8080/path");
client.execute(url, session ->
		session.receive()
				.doOnNext(System.out::println)
				.then());
val client = ReactorNettyWebSocketClient()

		val url = URI("ws://localhost:8080/path")
		client.execute(url) { session ->
			session.receive()
					.doOnNext(::println)
			.then()
		}

某些客户端(如 Jetty)实施并需要停止和启动 在您可以使用它们之前。所有客户端都有与配置相关的构造函数选项 底层 WebSocket 客户端的 Web 节点。Lifecyclespring-doc.cn

WebSocketSession方法 描述

Flux<WebSocketMessage> receive()spring-doc.cn

提供对入站消息流的访问,并在连接关闭时完成。spring-doc.cn

Mono<Void> send(Publisher<WebSocketMessage>)spring-doc.cn

获取传出消息的源,写入消息,并返回一个 在源完成和写入完成时完成。Mono<Void>spring-doc.cn

1 访问入站消息流。
2 对每条消息执行一些操作。
3 执行使用消息内容的嵌套异步操作。
4 返回 a ,该 a 在接收完成时完成。Mono<Void>
1 访问入站消息流。
2 对每条消息执行一些操作。
3 执行使用消息内容的嵌套异步操作。
4 返回 a ,该 a 在接收完成时完成。Mono<Void>
对于嵌套的异步操作,您可能需要调用底层 使用池化数据缓冲区的服务器(例如 Netty)。否则,数据缓冲区可能是 在您有机会读取数据之前释放。有关更多背景信息,请参阅 数据缓冲区和编解码器message.retain()
1 处理入站消息流。
2 创建出站消息,生成组合流。
3 返回未完成的 a,而我们继续接收。Mono<Void>
1 处理入站消息流。
2 创建出站消息,生成组合流。
3 返回未完成的 a,而我们继续接收。Mono<Void>
1 处理入站消息流。
2 发送传出消息。
3 加入流并返回 a ,该流在任一流结束时完成。Mono<Void>
1 处理入站消息流。
2 发送传出消息。
3 加入流并返回 a ,该流在任一流结束时完成。Mono<Void>
Tomcat 客户端实际上是标准 Java 客户端的扩展,带有一些额外的 功能,以利用特定于 Tomcat 的 用于暂停接收背压消息的 API。WebSocketSession