此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.4.0! |
WebSockets 支持
从版本 4.1 开始, Spring 集成具有 WebSocket 支持。
它基于 Spring Framework 模块中的架构、基础设施和 API。
因此, Spring WebSocket 的许多组件(例如 or )和配置选项(例如 )可以在 Spring Integration 中重用。
有关更多信息,请参阅 Spring Framework 参考手册中的 Spring Framework WebSocket Support 一章。web-socket
SubProtocolHandler
WebSocketClient
@EnableWebSocketMessageBroker
您需要将此依赖项包含在您的项目中:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-websocket</artifactId>
<version>6.4.1-SNAPSHOT</version>
</dependency>
compile "org.springframework.integration:spring-integration-websocket:6.4.1-SNAPSHOT"
对于服务器端,必须显式包含依赖项。org.springframework:spring-webmvc
Spring 框架 WebSocket 基础结构基于 Spring 消息传递基础,并提供了一个基本的消息传递框架,该框架基于 Spring 集成使用的相同实现和实现(以及一些 POJO 方法注释映射)。
因此, Spring 集成可以直接参与 WebSocket 流,即使没有 WebSocket 适配器也是如此。
为此,你可以使用适当的 Comments 配置 Spring 集成,如下例所示:MessageChannel
MessageHandler
@MessagingGateway
@MessagingGateway
@Controller
public interface WebSocketGateway {
@MessageMapping("/greeting")
@SendToUser("/queue/answer")
@Gateway(requestChannel = "greetingChannel")
String greeting(String payload);
}
概述
由于 WebSocket 协议根据定义是流式协议,并且我们可以同时向 WebSocket 发送和接收消息,因此我们可以处理适当的 ,无论是在客户端还是服务器端。
为了封装连接管理和注册表,提供了 和 implementations。
由于 WebSocket API 及其在 Spring Framework 中的实现(具有许多扩展),在服务器端和客户端(当然,从 Java 的角度来看)使用相同的类。
因此,大多数连接和注册表选项在两端都是相同的。
这允许我们重用许多配置项和基础设施钩子,以在服务器端和客户端构建 WebSocket 应用程序。
以下示例显示了组件如何同时实现这两个目的:WebSocketSession
WebSocketSession
IntegrationWebSocketContainer
ClientWebSocketContainer
ServerWebSocketContainer
WebSocketSession
//Client side
@Bean
public WebSocketClient webSocketClient() {
return new SockJsClient(Collections.singletonList(new WebSocketTransport(new JettyWebSocketClient())));
}
@Bean
public IntegrationWebSocketContainer clientWebSocketContainer() {
return new ClientWebSocketContainer(webSocketClient(), "ws://my.server.com/endpoint");
}
//Server side
@Bean
public IntegrationWebSocketContainer serverWebSocketContainer() {
return new ServerWebSocketContainer("/endpoint").withSockJs();
}
它旨在实现双向消息传递,并且可以在入站和出站通道适配器之间共享(见下文),在使用单向(发送或接收)WebSocket 消息传递时,只能从其中一个适配器引用。
它可以在没有任何通道适配器的情况下使用,但在这种情况下,它只充当注册表。IntegrationWebSocketContainer
IntegrationWebSocketContainer
WebSocketSession
将 internal 注册为 .
它在目标供应商 WebSocket 容器中提供的其他服务器 WebSocket 选项(如 或 )下执行此操作。
此注册是通过 infrastructural 组件实现的,该组件的作用与 annotation 相同。
这意味着,通过使用(或应用程序上下文中的任何 Spring 集成名称空间),你可以省略声明,因为 Spring 集成基础结构会检测所有 WebSocket 端点。ServerWebSocketContainer WebSocketConfigurer IntegrationWebSocketContainer.IntegrationWebSocketHandler Endpoint paths HandshakeHandler SockJS fallback ServletWebSocketHandlerRegistry WebSocketIntegrationConfigurationInitializer @EnableWebSocket @EnableIntegration @EnableWebSocket |
从版本 6.1 开始,可以使用 provided 而不是 和 组合进行配置。
这在 URI 的某些部分需要自定义编码的情况下非常有用。
为方便起见,请参阅 API。ClientWebSocketContainer
URI
uriTemplate
uriVariables
UriComponentsBuilder
WebSocket 入站通道适配器
实现 interaction 的接收部分。
您必须为其提供 ,并且适配器会将自身注册为 a 以处理传入的消息和事件。WebSocketInboundChannelAdapter
WebSocketSession
IntegrationWebSocketContainer
WebSocketListener
WebSocketSession
只能在 .WebSocketListener IntegrationWebSocketContainer |
对于 WebSocket 子协议,可以将其配置为第二个 constructor 参数。
适配器委托 来确定 接受的适当值,并根据子协议实现将 a 转换为 a。WebSocketInboundChannelAdapter
SubProtocolHandlerRegistry
SubProtocolHandlerRegistry
SubProtocolHandler
WebSocketSession
WebSocketMessage
Message
默认情况下,the 仅依赖于 raw 实现,该实现将 转换为 .WebSocketInboundChannelAdapter PassThruSubProtocolHandler WebSocketMessage Message |
它仅接受具有或空标头的实例并将其发送到底层集成流。
所有其他类型都通过从实现发出的实例(例如 )进行处理。WebSocketInboundChannelAdapter
Message
SimpMessageType.MESSAGE
simpMessageType
Message
ApplicationEvent
SubProtocolHandler
StompSubProtocolHandler
在服务器端,如果存在配置,则可以使用 option 进行配置。
在这种情况下,所有类型都委托给提供的 .
此外,如果 Broker 中继配置了目标前缀,则与 Broker 目标匹配的消息将路由到 ,而不是 。@EnableWebSocketMessageBroker
WebSocketInboundChannelAdapter
useBroker = true
non-MESSAGE
Message
AbstractBrokerMessageHandler
AbstractBrokerMessageHandler
outputChannel
WebSocketInboundChannelAdapter
如果 和 收到的消息是 类型,则立即向 发送消息,而不将其发送到通道。useBroker = false
SimpMessageType.CONNECT
WebSocketInboundChannelAdapter
SimpMessageType.CONNECT_ACK
WebSocketSession
Spring 的 WebSocket 支持只允许配置一个代理中继。
因此,我们不需要参考资料。
在 Application Context 中检测到它。AbstractBrokerMessageHandler |
有关更多配置选项,请参阅 WebSockets 命名空间支持。
WebSocket 出站通道适配器
这:WebSocketOutboundChannelAdapter
-
接受来自其 Spring Integration 的消息
MessageChannel
-
确定从
WebSocketSession
id
MessageHeaders
-
从提供的
WebSocketSession
IntegrationWebSocketContainer
-
将工作的转换和发送委托给提供的相应 。
WebSocketMessage
SubProtocolHandler
SubProtocolHandlerRegistry
在客户端,消息头不是必需的,因为只处理单个连接及其分别。WebSocketSession
id
ClientWebSocketContainer
WebSocketSession
要使用 STOMP 子协议,您应该为此适配器配置一个 .
然后,你可以使用 and a 将任何 STOMP 消息类型发送到此适配器,或者只使用 a (参见Header Enricher)。StompSubProtocolHandler
StompHeaderAccessor.create(StompCommand…)
MessageBuilder
HeaderEnricher
本章的其余部分主要介绍其他配置选项。
WebSockets 命名空间支持
Spring 集成 WebSocket 名称空间包括本章其余部分中描述的几个组件。 要将其包含在配置中,请在应用程序上下文配置文件中使用以下命名空间声明:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-websocket="http://www.springframework.org/schema/integration/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/websocket
https://www.springframework.org/schema/integration/websocket/spring-integration-websocket.xsd">
...
</beans>
<int-websocket:client-container>
属性
下面的清单显示了该元素可用的属性:<int-websocket:client-container>
<int-websocket:client-container
id="" (1)
client="" (2)
uri="" (3)
uri-variables="" (4)
origin="" (5)
send-time-limit="" (6)
send-buffer-size-limit="" (7)
send-buffer-overflow-strategy="" (8)
auto-startup="" (9)
phase=""> (10)
<int-websocket:http-headers>
<entry key="" value=""/>
</int-websocket:http-headers> (11)
</int-websocket:client-container>
1 | 组件 Bean 名称。 |
2 | Bean 引用。WebSocketClient |
3 | 或到目标 WebSocket 服务。
如果将其用作 with URI 变量占位符,则该属性是必需的。uri uriTemplate uriTemplate uri-variables |
4 | 属性值中 URI 变量占位符的逗号分隔值。
这些值将根据它们在 .
请参阅 UriComponents.expand(Object...uriVariableValues) 的 Values。 uri uri |
5 | Handshake HTTP 标头值。Origin |
6 | WebSocket 会话 'send' 超时限制。
默认为 。10000 |
7 | WebSocket 会话 'send' 消息大小限制。
默认为 。524288 |
8 | WebSocket 会话发送缓冲区溢出策略
确定会话的出站消息缓冲区到达 .
有关可能的值和更多详细信息,请参阅 。send-buffer-size-limit ConcurrentWebSocketSessionDecorator.OverflowStrategy |
9 | 指示此端点是否应自动启动的布尔值。
默认为 ,假设此容器是从 WebSocket 入站适配器启动的。false |
10 | 此终端节点应在其中启动和停止的生命周期阶段。
值越低,此终端节点开始得越早,停止得越晚。
默认值为 .
值可以是负数。
请参阅 SmartLifeCycle 。Integer.MAX_VALUE |
11 | A of 用于握手请求。Map HttpHeaders |
<int-websocket:server-container>
属性
下面的清单显示了该元素可用的属性:<int-websocket:server-container>
<int-websocket:server-container
id="" (1)
path="" (2)
handshake-handler="" (3)
handshake-interceptors="" (4)
decorator-factories="" (5)
send-time-limit="" (6)
send-buffer-size-limit="" (7)
send-buffer-overflow-strategy="" (8)
allowed-origins=""> (9)
<int-websocket:sockjs
client-library-url="" (10)
stream-bytes-limit="" (11)
session-cookie-needed="" (12)
heartbeat-time="" (13)
disconnect-delay="" (14)
message-cache-size="" (15)
websocket-enabled="" (16)
scheduler="" (17)
message-codec="" (18)
transport-handlers="" (19)
suppress-cors="true" /> (20)
</int-websocket:server-container>
1 | 组件 Bean 名称。 |
2 | 将特定请求映射到 .
支持精确的路径映射 URI(例如 )和 ant 样式的路径模式(如 )。WebSocketHandler /myPath /myPath/** |
3 | Bean 引用。
默认为 。HandshakeHandler DefaultHandshakeHandler |
4 | Bean 引用列表。HandshakeInterceptor |
5 | 一个或多个工厂 () 的列表,用于修饰用于处理 WebSocket 消息的处理程序。
这对于某些高级用例可能很有用(例如,允许 Spring Security 强制关闭
WebSocket 会话(当相应的 HTTP 会话过期时)。
有关更多信息,请参见 Spring Session 项目。WebSocketHandlerDecoratorFactory |
6 | 在 <int-websocket:client-container> 上查看相同的选项。 |
7 | 在 <int-websocket:client-container> 上查看相同的选项。 |
8 | WebSocket 会话发送缓冲区溢出策略
确定会话的出站消息缓冲区到达 .
有关可能的值和更多详细信息,请参阅 。send-buffer-size-limit ConcurrentWebSocketSessionDecorator.OverflowStrategy |
9 | 允许的源标头值。
您可以将多个源指定为逗号分隔列表。
此检查主要针对浏览器客户端设计。
没有什么可以阻止其他类型的 Client 端修改 origin 标头值。
启用 Sockjs 并限制允许的源时,将禁用不对跨源请求(、、 和 )使用源标头的传输类型。
因此,IE6 和 IE7 不受支持,IE8 和 IE9 仅支持没有 Cookie。
默认情况下,允许所有源。jsonp-polling iframe-xhr-polling iframe-eventsource iframe-htmlfile |
10 | 没有本机跨域通信的传输(例如 和 )必须在不可见的 iframe 中从“外部”域获取一个简单的页面,以便 iframe 中的代码可以从 Sockjs 服务器的本地域运行。
由于 iframe 需要加载 Sockjs javascript 客户端库,因此此属性允许您指定加载它的位置。
默认情况下,它指向 .
但是,您也可以将其设置为指向应用程序提供的 URL。
请注意,可以指定相对 URL,在这种情况下,URL 必须相对于 iframe URL。
例如,假设映射到 Sockjs 端点并且生成的 iframe URL 为 ,则相对 URL 必须以 “../../“ 遍历到 Sockjs 映射上方的位置。
对于基于前缀的 servlet 映射,您可能需要再进行一次遍历。eventsource htmlfile d1fxtkz8shb9d2.cloudfront.net/sockjs-0.3.4.min.js /sockjs /sockjs/iframe.html |
11 | 在关闭单个 HTTP 流请求之前可以通过该请求发送的最小字节数。
默认为 (即 128*1024 或 131072 字节)。128K |
12 | 来自 SockJs 端点的响应中的值。
此属性指示应用程序是否需要 Cookie 才能正常运行(例如,用于负载平衡或在 Java Servlet 容器中使用 HTTP 会话)。cookie_needed /info JSESSIONID |
13 | 服务器未发送任何消息的时间量(以毫秒为单位),超过此时间后服务器应发送
向客户端发送检测信号帧,以防止连接中断。
默认值为 (25 秒)。25,000 |
14 | 在没有接收连接(即服务器可以通过该连接向客户端发送数据的活动连接)后,客户端被视为断开连接之前的时间(以毫秒为单位)。
默认值为 .5000 |
15 | 会话在等待来自客户端的下一个 HTTP 轮询请求时可以缓存的服务器到客户端消息的数量。
默认大小为 。100 |
16 | 某些负载均衡器不支持 WebSockets。
将此选项设置为在服务器端禁用 WebSocket 传输。
默认值为 .false true |
17 | Bean 引用。
如果未提供任何值,则会创建一个新实例。
此计划程序实例用于调度心率消息。TaskScheduler ThreadPoolTaskScheduler |
18 | 用于编码和解码 SockJS 消息的 bean 引用。
默认情况下,它要求 Jackson 库存在于 Classpath 中。SockJsMessageCodec Jackson2SockJsMessageCodec |
19 | Bean 引用列表。TransportHandler |
20 | 是否禁用 Sockjs 请求的自动添加 CORS 标头。
默认值为 .false |
<int-websocket:outbound-channel-adapter>
属性
下面的清单显示了该元素可用的属性:<int-websocket:outbound-channel-adapter>
<int-websocket:outbound-channel-adapter
id="" (1)
channel="" (2)
container="" (3)
default-protocol-handler="" (4)
protocol-handlers="" (5)
message-converters="" (6)
merge-with-default-converters="" (7)
auto-startup="" (8)
phase=""/> (9)
1 | 组件 Bean 名称。
如果未提供该属性,则会在应用程序上下文中创建并注册 a,并将此属性作为 Bean 名称。
在这种情况下,端点使用 bean name plus 注册。
并且 is registered with bean alias plus .channel DirectChannel id id .adapter MessageHandler id .handler |
2 | 标识连接到此适配器的通道。 |
3 | 对 bean 的引用,它封装了低级连接和处理操作。
必填。IntegrationWebSocketContainer WebSocketSession |
4 | 对实例的可选引用。
当 Client 端没有请求子协议或它是单个协议处理程序时,使用它。
如果未提供此引用或列表,则默认使用 。SubProtocolHandler protocol-handlers PassThruSubProtocolHandler |
5 | 此通道适配器的 Bean 引用列表。
如果只提供单个 Bean 引用而未提供 ,则该 single 将用作 .
如果未设置此属性或 ,则默认使用 。SubProtocolHandler default-protocol-handler SubProtocolHandler default-protocol-handler default-protocol-handler PassThruSubProtocolHandler |
6 | 此通道适配器的 Bean 引用列表。MessageConverter |
7 | 布尔值,指示是否应在任何自定义转换器之后注册默认转换器。
仅当提供此标志时,才使用此标志。
否则,将注册所有默认转换器。
默认为 。
默认转换器是(按顺序):、 、 和 (如果 Classpath 中存在 Jackson 库)。message-converters false StringMessageConverter ByteArrayMessageConverter MappingJackson2MessageConverter |
8 | 指示此端点是否应自动启动的布尔值。
默认为 。true |
9 | 此终端节点应在其中启动和停止的生命周期阶段。
值越低,此终端节点开始得越早,停止得越晚。
默认值为 .
值可以是负数。
请参阅 SmartLifeCycle 。Integer.MIN_VALUE |
<int-websocket:inbound-channel-adapter>
属性
下面的清单显示了该元素可用的属性:<int-websocket:outbound-channel-adapter>
<int-websocket:inbound-channel-adapter
id="" (1)
channel="" (2)
error-channel="" (3)
container="" (4)
default-protocol-handler="" (5)
protocol-handlers="" (6)
message-converters="" (7)
merge-with-default-converters="" (8)
send-timeout="" (9)
payload-type="" (10)
use-broker="" (11)
auto-startup="" (12)
phase=""/> (13)
1 | 组件 Bean 名称。
如果不设置该属性,则会在应用程序上下文中创建并注册 a,并将此属性作为 Bean 名称。
在这种情况下,端点使用 bean name plus 注册。channel DirectChannel id id .adapter |
2 | 标识连接到此适配器的通道。 |
3 | 实例应发送到的 Bean 引用。MessageChannel ErrorMessage |
4 | 在 <int-websocket:outbound-channel-adapter> 上查看相同的选项。 |
5 | 在 <int-websocket:outbound-channel-adapter> 上查看相同的选项。 |
6 | 在 <int-websocket:outbound-channel-adapter> 上查看相同的选项。 |
7 | 在 <int-websocket:outbound-channel-adapter> 上查看相同的选项。 |
8 | 在 <int-websocket:outbound-channel-adapter> 上查看相同的选项。 |
9 | 如果通道可以阻塞,则向通道发送消息时要等待的最长时间(以毫秒为单位)。
例如,如果已达到最大容量,则 can 会阻塞,直到有可用空间。QueueChannel |
10 | 要从传入 .
默认为 。payload WebSocketMessage java.lang.String |
11 | 指示此适配器是否从应用程序上下文向 发送带有代理目标的实例和消息。
当此属性为 时,需要配置。
此属性仅在服务器端使用。
在客户端,它被忽略。
默认为 。non-MESSAGE WebSocketMessage AbstractBrokerMessageHandler true Broker Relay false |
12 | 在 <int-websocket:outbound-channel-adapter> 上查看相同的选项。 |
13 | 在 <int-websocket:outbound-channel-adapter> 上查看相同的选项。 |
用ClientStompEncoder
从版本4.3.13开始, Spring 集成提供(作为标准的扩展)用于 WebSocket 通道适配器的客户端。
为了正确地准备客户端消息,您必须将 的实例注入到 .
default 的一个问题是它是为服务器端设计的,因此它将 Headers 更新为(根据服务器端的 STOMP 协议的要求)。
如果 Client 端没有在正确的 Web 套接字框架中发送其消息,则某些 STOMP 代理不接受它们。
在本例中,的目的是覆盖报头并将其设置为在将消息编码为 之前的值。ClientStompEncoder
StompEncoder
ClientStompEncoder
StompSubProtocolHandler
StompSubProtocolHandler
SEND
stompCommand
MESSAGE
SEND
ClientStompEncoder
stompCommand
SEND
byte[]
动态 WebSocket 终端节点注册
从版本 5.5 开始,现在可以在运行时注册(和删除)WebSocket 服务器端点(基于 a 的通道适配器)- 映射的 a 通过 a 公开到 a 中,并可供 WebSocket 客户端访问。
动态和运行时集成流支持有助于以透明的方式注册这些端点:ServerWebSocketContainer
paths
ServerWebSocketContainer
HandlerMapping
DispatcherServlet
@Autowired
IntegrationFlowContext integrationFlowContext;
@Autowired
HandshakeHandler handshakeHandler;
...
ServerWebSocketContainer serverWebSocketContainer =
new ServerWebSocketContainer("/dynamic")
.setHandshakeHandler(this.handshakeHandler);
WebSocketInboundChannelAdapter webSocketInboundChannelAdapter =
new WebSocketInboundChannelAdapter(serverWebSocketContainer);
QueueChannel dynamicRequestsChannel = new QueueChannel();
IntegrationFlow serverFlow =
IntegrationFlow.from(webSocketInboundChannelAdapter)
.channel(dynamicRequestsChannel)
.get();
IntegrationFlowContext.IntegrationFlowRegistration dynamicServerFlow =
this.integrationFlowContext.registration(serverFlow)
.addBean(serverWebSocketContainer)
.register();
...
dynamicServerFlow.destroy();
请务必调用动态流注册以将 的实例添加到端点注册中。
销毁动态流注册时,关联的实例以及相应的终端节点注册(包括 URL 路径映射)也会被销毁。.addBean(serverWebSocketContainer) ServerWebSocketContainer ApplicationContext ServerWebSocketContainer |
动态 Websocket 端点只能通过 Spring 集成机制进行注册:当使用常规 Spring 时, Spring 集成配置会退缩,并且不会注册动态端点的基础设施。@EnableWebsocket |