参考文档的这一部分涵盖了对反应式堆栈 WebSocket 的支持 消息。
WebSocket 简介
WebSocket 协议 RFC 6455 提供了标准化的 在 Client 端和 Server 之间建立全双工双向通信通道的方法 通过单个 TCP 连接。它是与 HTTP 不同的 TCP 协议,但旨在 通过 HTTP 工作,使用端口 80 和 443,并允许重复使用现有防火墙规则。
WebSocket 交互以使用 HTTP 标头的 HTTP 请求开始
进行升级,或者在本例中切换到 WebSocket 协议。以下示例
显示了这样的交互:Upgrade
GET /spring-websocket-portfolio/portfolio HTTP/1.1
Host: localhost:8080
Upgrade: websocket (1)
Connection: Upgrade (2)
Sec-WebSocket-Key: Uc9l9TMkWGbHFD2qnFHltg==
Sec-WebSocket-Protocol: v10.stomp, v11.stomp
Sec-WebSocket-Version: 13
Origin: http://localhost:8080
1 | 标头。Upgrade |
2 | 使用连接。Upgrade |
支持 WebSocket 的服务器返回输出,而不是通常的 200 状态代码 类似于以下内容:
HTTP/1.1 101 Switching Protocols (1)
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: 1qVdfYHU9hPOl4JYYNXF623Gzn0=
Sec-WebSocket-Protocol: v10.stomp
1 | 协议切换 |
握手成功后,HTTP 升级请求的基础 TCP 套接字将保留 open 以继续发送和接收消息。
有关 WebSockets 工作原理的完整介绍超出了本文档的范围。 请参阅 RFC 6455、HTML5 的 WebSocket 章节或许多介绍中的任何一个 Web 上的教程。
请注意,如果 WebSocket 服务器在 Web 服务器(例如 nginx)后面运行,则 可能需要对其进行配置,以便将 WebSocket 升级请求传递给 WebSocket 服务器。同样,如果应用程序在云环境中运行,请检查 云提供商与 WebSocket 支持相关的说明。
HTTP 与 WebSocket
尽管 WebSocket 设计为与 HTTP 兼容并以 HTTP 请求开头, 重要的是要了解这两种协议导致非常不同的结果 体系结构和应用程序编程模型。
在 HTTP 和 REST 中,应用程序被建模为多个 URL。要与应用程序交互, 客户端以请求-响应样式访问这些 URL。服务器将请求路由到 基于 HTTP URL、方法和标头的适当处理程序。
相比之下,在 WebSockets 中,初始连接通常只有一个 URL。 随后,所有应用程序消息都在同一 TCP 连接上流动。这指向 一种完全不同的异步、事件驱动的消息传递架构。
WebSocket 也是一种低级传输协议,与 HTTP 不同,它没有规定 消息内容的任何语义。这意味着无法路由或处理 一条消息,除非客户端和服务器在消息语义上达成一致。
WebSocket 客户端和服务器可以协商使用更高级别的消息传递协议
(例如,STOMP),通过 HTTP 握手请求上的标头。
如果没有这些,他们需要提出自己的惯例。Sec-WebSocket-Protocol
何时使用 WebSockets
WebSockets 可以使网页具有动态和交互性。但是,在许多情况下, AJAX 和 HTTP 流式处理或长轮询的组合可以提供简单且 有效的解决方案。
例如,新闻、邮件和社交源需要动态更新,但可能需要 每隔几分钟这样做一次完全可以。协作、游戏和金融应用程序 另一方面,需要更接近实时。
延迟本身并不是决定因素。如果消息量相对较低(例如, 监控网络故障)HTTP 流或轮询可以提供有效的解决方案。 低延迟、高频率和高容量的组合造就了最好的 case 来使用 WebSocket。
另请记住,在 Internet 上,您无法控制的限制性代理
可能会排除 WebSocket 交互,因为它们未配置为传递 Headers,或者因为它们关闭了看起来空闲的长期连接。这
意味着将 WebSocket 用于防火墙内的内部应用程序是
比面向公众的应用程序更直接的决定。Upgrade
1 | 标头。Upgrade |
2 | 使用连接。Upgrade |
1 | 协议切换 |
WebSocket API
Spring Framework 提供了一个 WebSocket API,您可以使用它来编写 client- 和 处理 WebSocket 消息的服务器端应用程序。
服务器
要创建 WebSocket 服务器,您可以先创建一个 .
以下示例显示了如何执行此操作:WebSocketHandler
-
Java
-
Kotlin
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;
public class MyWebSocketHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
// ...
}
}
import org.springframework.web.reactive.socket.WebSocketHandler
import org.springframework.web.reactive.socket.WebSocketSession
class MyWebSocketHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
// ...
}
}
然后,您可以将其映射到 URL:
-
Java
-
Kotlin
@Configuration
class WebConfig {
@Bean
public HandlerMapping handlerMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/path", new MyWebSocketHandler());
int order = -1; // before annotated controllers
return new SimpleUrlHandlerMapping(map, order);
}
}
@Configuration
class WebConfig {
@Bean
fun handlerMapping(): HandlerMapping {
val map = mapOf("/path" to MyWebSocketHandler())
val order = -1 // before annotated controllers
return SimpleUrlHandlerMapping(map, order)
}
}
如果使用 WebFlux 配置,则没有任何内容
进一步操作,或者如果不使用 WebFlux 配置,则需要声明 a,如下所示:WebSocketHandlerAdapter
-
Java
-
Kotlin
@Configuration
class WebConfig {
// ...
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter();
}
}
@Configuration
class WebConfig {
// ...
@Bean
fun handlerAdapter() = WebSocketHandlerAdapter()
}
WebSocketHandler
方法 takes 和 returns 指示应用程序对会话的处理何时完成。会话已处理
通过两个流,一个用于入站消息,一个用于出站消息。下表
介绍处理流的两种方法:handle
WebSocketHandler
WebSocketSession
Mono<Void>
WebSocketSession 方法 |
描述 |
---|---|
|
提供对入站消息流的访问,并在连接关闭时完成。 |
|
获取传出消息的源,写入消息,并返回一个
在源完成和写入完成时完成。 |
A 必须将入站和出站流组合成一个统一的流,并且
返回反映该流完成情况的 A。取决于应用
要求,则统一流将在以下情况下完成:WebSocketHandler
Mono<Void>
-
入站或出站消息流完成。
-
入站流完成(即连接关闭),而出站流是无限的。
-
在选定的点,通过 的方法。
close
WebSocketSession
当入站和出站消息流组合在一起时,无需 检查连接是否打开,因为 Reactive Streams 向 end activity 发出信号。 入站流接收到完成或错误信号,出站流 接收取消信号。
处理程序的最基本实现是处理入站流的处理程序。这 以下示例显示了此类实现:
-
Java
-
Kotlin
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
return session.receive() (1)
.doOnNext(message -> {
// ... (2)
})
.concatMap(message -> {
// ... (3)
})
.then(); (4)
}
}
1 | 访问入站消息流。 |
2 | 对每条消息执行一些操作。 |
3 | 执行使用消息内容的嵌套异步操作。 |
4 | 返回 a ,该 a 在接收完成时完成。Mono<Void> |
class ExampleHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
return session.receive() (1)
.doOnNext {
// ... (2)
}
.concatMap {
// ... (3)
}
.then() (4)
}
}
1 | 访问入站消息流。 |
2 | 对每条消息执行一些操作。 |
3 | 执行使用消息内容的嵌套异步操作。 |
4 | 返回 a ,该 a 在接收完成时完成。Mono<Void> |
对于嵌套的异步操作,您可能需要调用底层
使用池化数据缓冲区的服务器(例如 Netty)。否则,数据缓冲区可能是
在您有机会读取数据之前释放。有关更多背景信息,请参阅 数据缓冲区和编解码器。message.retain() |
以下实现结合了入站流和出站流:
-
Java
-
Kotlin
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
Flux<WebSocketMessage> output = session.receive() (1)
.doOnNext(message -> {
// ...
})
.concatMap(message -> {
// ...
})
.map(value -> session.textMessage("Echo " + value)); (2)
return session.send(output); (3)
}
}
1 | 处理入站消息流。 |
2 | 创建出站消息,生成组合流。 |
3 | 返回未完成的 a,而我们继续接收。Mono<Void> |
class ExampleHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
val output = session.receive() (1)
.doOnNext {
// ...
}
.concatMap {
// ...
}
.map { session.textMessage("Echo $it") } (2)
return session.send(output) (3)
}
}
1 | 处理入站消息流。 |
2 | 创建出站消息,生成组合流。 |
3 | 返回未完成的 a,而我们继续接收。Mono<Void> |
入站流和出站流可以是独立的,并且仅在完成时加入, 如下例所示:
-
Java
-
Kotlin
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
Mono<Void> input = session.receive() (1)
.doOnNext(message -> {
// ...
})
.concatMap(message -> {
// ...
})
.then();
Flux<String> source = ... ;
Mono<Void> output = session.send(source.map(session::textMessage)); (2)
return Mono.zip(input, output).then(); (3)
}
}
1 | 处理入站消息流。 |
2 | 发送传出消息。 |
3 | 加入流并返回 a ,该流在任一流结束时完成。Mono<Void> |
class ExampleHandler : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
val input = session.receive() (1)
.doOnNext {
// ...
}
.concatMap {
// ...
}
.then()
val source: Flux<String> = ...
val output = session.send(source.map(session::textMessage)) (2)
return Mono.zip(input, output).then() (3)
}
}
1 | 处理入站消息流。 |
2 | 发送传出消息。 |
3 | 加入流并返回 a ,该流在任一流结束时完成。Mono<Void> |
DataBuffer
DataBuffer
是 WebFlux 中字节缓冲区的表示形式。Spring Core 的
该参考在 Data Buffers and Codecs 一节中有更多关于这方面的内容。要理解的关键点是,在某些
像 Netty 这样的服务器,字节缓冲区是池化的和引用计数的,并且必须释放
当使用以避免内存泄漏时。
在 Netty 上运行时,应用程序必须使用 if
希望保留 input 数据缓冲区以确保它们不会被释放,并且
随后在消耗缓冲区时使用。DataBufferUtils.retain(dataBuffer)
DataBufferUtils.release(dataBuffer)
握手
WebSocketHandlerAdapter
委托给 .默认情况下,这是一个实例
of 执行基本检查,它对 WebSocket 请求执行基本检查,而
然后用于正在使用的服务器。目前,有内置的
支持 Reactor Netty、Tomcat、Jetty 和 Undertow。WebSocketService
HandshakeWebSocketService
RequestUpgradeStrategy
HandshakeWebSocketService
公开一个属性,该属性允许
设置 a 以从 中提取属性并插入它们
转换为 的属性中。sessionAttributePredicate
Predicate<String>
WebSession
WebSocketSession
服务器配置
的 for each server 公开特定于
底层 WebSocket 服务器引擎。使用 WebFlux Java 配置时,您可以自定义
如 WebFlux Config 的相应部分所示的属性,或者如果
不使用 WebFlux 配置,请使用以下命令:RequestUpgradeStrategy
-
Java
-
Kotlin
@Configuration
class WebConfig {
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter(webSocketService());
}
@Bean
public WebSocketService webSocketService() {
TomcatRequestUpgradeStrategy strategy = new TomcatRequestUpgradeStrategy();
strategy.setMaxSessionIdleTimeout(0L);
return new HandshakeWebSocketService(strategy);
}
}
@Configuration
class WebConfig {
@Bean
fun handlerAdapter() =
WebSocketHandlerAdapter(webSocketService())
@Bean
fun webSocketService(): WebSocketService {
val strategy = TomcatRequestUpgradeStrategy().apply {
setMaxSessionIdleTimeout(0L)
}
return HandshakeWebSocketService(strategy)
}
}
检查服务器的升级策略以查看可用的选项。现在 只有 Tomcat 和 Jetty 公开了此类选项。
CORS
配置 CORS 并限制对 WebSocket 终端节点的访问的最简单方法是
实现并返回包含允许的来源、标头和其他详细信息的 a。如果你做不到
,您还可以将
通过 URL 模式指定 CORS 设置。如果同时指定了这两者,则使用 on 上的方法将它们组合在一起。WebSocketHandler
CorsConfigurationSource
CorsConfiguration
corsConfigurations
SimpleUrlHandler
combine
CorsConfiguration
客户端
Spring WebFlux 提供了一个抽象,其中包含
Reactor Netty、Tomcat、Jetty、Undertow 和标准 Java(即 JSR-356)。WebSocketClient
Tomcat 客户端实际上是标准 Java 客户端的扩展,带有一些额外的
功能,以利用特定于 Tomcat 的
用于暂停接收背压消息的 API。WebSocketSession |
要启动 WebSocket 会话,您可以创建客户端的实例并使用其方法:execute
-
Java
-
Kotlin
WebSocketClient client = new ReactorNettyWebSocketClient();
URI url = new URI("ws://localhost:8080/path");
client.execute(url, session ->
session.receive()
.doOnNext(System.out::println)
.then());
val client = ReactorNettyWebSocketClient()
val url = URI("ws://localhost:8080/path")
client.execute(url) { session ->
session.receive()
.doOnNext(::println)
.then()
}
某些客户端(如 Jetty)实施并需要停止和启动
在您可以使用它们之前。所有客户端都有与配置相关的构造函数选项
底层 WebSocket 客户端的 Web 节点。Lifecycle
WebSocketSession 方法 |
描述 |
---|---|
|
提供对入站消息流的访问,并在连接关闭时完成。 |
|
获取传出消息的源,写入消息,并返回一个
在源完成和写入完成时完成。 |
1 | 访问入站消息流。 |
2 | 对每条消息执行一些操作。 |
3 | 执行使用消息内容的嵌套异步操作。 |
4 | 返回 a ,该 a 在接收完成时完成。Mono<Void> |
1 | 访问入站消息流。 |
2 | 对每条消息执行一些操作。 |
3 | 执行使用消息内容的嵌套异步操作。 |
4 | 返回 a ,该 a 在接收完成时完成。Mono<Void> |
对于嵌套的异步操作,您可能需要调用底层
使用池化数据缓冲区的服务器(例如 Netty)。否则,数据缓冲区可能是
在您有机会读取数据之前释放。有关更多背景信息,请参阅 数据缓冲区和编解码器。message.retain() |
1 | 处理入站消息流。 |
2 | 创建出站消息,生成组合流。 |
3 | 返回未完成的 a,而我们继续接收。Mono<Void> |
1 | 处理入站消息流。 |
2 | 创建出站消息,生成组合流。 |
3 | 返回未完成的 a,而我们继续接收。Mono<Void> |
1 | 处理入站消息流。 |
2 | 发送传出消息。 |
3 | 加入流并返回 a ,该流在任一流结束时完成。Mono<Void> |
1 | 处理入站消息流。 |
2 | 发送传出消息。 |
3 | 加入流并返回 a ,该流在任一流结束时完成。Mono<Void> |
Tomcat 客户端实际上是标准 Java 客户端的扩展,带有一些额外的
功能,以利用特定于 Tomcat 的
用于暂停接收背压消息的 API。WebSocketSession |