STOMP 支持
STOMP 支持
Spring 集成版本 4.2 引入了 STOMP(简单文本定向消息传递协议)客户端支持。
它基于 Spring Framework 的消息模块、stomp 包的架构、基础设施和 API。
Spring 集成使用许多 Spring STOMP 组件(例如StompSession
和StompClientSupport
).
有关更多信息,请参见 Spring Framework 参考手册中的 Spring Framework STOMP Support 一章。
您需要将此依赖项包含在您的项目中:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stomp</artifactId>
<version>6.0.9</version>
</dependency>
compile "org.springframework.integration:spring-integration-stomp:6.0.9"
对于服务器端组件,您需要添加一个org.springframework:spring-websocket
和/或io.projectreactor.netty:reactor-netty
依赖。
概述
要配置 STOMP,您应该从 STOMP 客户端对象开始。 Spring Framework 提供了以下实现:
-
WebSocketStompClient
:基于 Spring WebSocket API 构建,支持标准 JSR-356 WebSocket、Jetty 9 和 Sockjs,用于使用 Sockjs 客户端进行基于 HTTP 的 WebSocket 仿真。 -
ReactorNettyTcpStompClient
:基于ReactorNettyTcpClient
从reactor-netty
项目。
您可以提供任何其他StompClientSupport
实现。
请参阅这些类的 Javadoc。
这StompClientSupport
class 被设计为工厂来生成StompSession
对于提供的StompSessionHandler
其余所有工作都是通过回调完成的StompSessionHandler
和StompSession
抽象化。
使用 Spring 集成适配器抽象,我们需要提供一些托管的共享对象,以将我们的应用程序表示为具有唯一会话的 STOMP 客户端。
为此, Spring 集成提供了StompSessionManager
abstraction 来管理单个 StompSession
之间提供的任何StompSessionHandler
.
这允许对特定的 STOMP Broker 使用入站或出站通道适配器(或两者)。
看StompSessionManager
(及其实现)JavaDocs 了解更多信息。
STOMP 入站通道适配器
这StompInboundChannelAdapter
是一站式的MessageProducer
组件将 Spring 集成应用程序订阅到提供的 STOMP 目标,并从中接收消息(使用提供的MessageConverter
在互联StompSession
).
您可以在运行时使用适当的@ManagedOperation
annotations 上的StompInboundChannelAdapter
.
有关更多配置选项,请参阅 STOMP 命名空间支持和StompInboundChannelAdapter
Javadoc 的 Java 文档。
STOMP 出站通道适配器
这StompMessageHandler
是MessageHandler
对于<int-stomp:outbound-channel-adapter>
,用于发送传出Message<?>
实例添加到 STOMPdestination
(预先配置或在运行时使用 SPEL 表达式确定)通过StompSession
(由共享的StompSessionManager
).
有关更多配置选项,请参阅 STOMP 命名空间支持和StompMessageHandler
Javadoc 的 Java 文档。
STOMP 标头映射
STOMP 协议提供标头作为其帧的一部分。 STOMP 帧的整个结构具有以下格式:
....
COMMAND
header1:value1
header2:value2
Body^@
....
Spring Framework 提供StompHeaders
来表示这些标头。
有关更多详细信息,请参阅 Javadoc。
STOMP 帧相互转换Message<?>
实例,这些标头映射到MessageHeaders
实例。
Spring 集成提供了一个默认的HeaderMapper
STOMP 适配器的实现。
实现方式是StompHeaderMapper
.
它提供fromHeaders()
和toHeaders()
作。
与许多其他 Spring 集成模块一样,IntegrationStompHeaders
类已引入以将标准 STOMP 标头映射到MessageHeaders
跟stomp_
作为标头名称前缀。
此外,所有MessageHeaders
具有该前缀的实例将映射到StompHeaders
发送到接收方时。
STOMP 集成事件
许多 STOMP作是异步的,包括错误处理。
例如,STOMP 有一个RECEIPT
server 帧,当客户端帧请求一个 server 帧时,它通过添加RECEIPT
页眉。
为了提供对这些异步事件的访问, Spring 集成会发出StompIntegrationEvent
实例,您可以通过实现ApplicationListener
或使用<int-event:inbound-channel-adapter>
(参见接收 Spring 应用程序事件)。
具体来说,StompExceptionEvent
从AbstractStompSessionManager
当stompSessionListenableFuture
接收onFailure()
由于无法连接到 STOMP 代理。
另一个示例是StompMessageHandler
.
它处理ERROR
STOMP 帧,这是服务器对此发送的不当(未接受)消息的响应StompMessageHandler
.
这StompMessageHandler
发出StompReceiptEvent
作为StompSession.Receiptable
异步应答中的回调,用于发送到StompSession
.
这StompReceiptEvent
可以是正数或负数,具体取决于RECEIPT
帧是从receiptTimeLimit
期间,您可以在StompClientSupport
实例。
它默认为15 * 1000
(以毫秒为单位,即 15 秒)。
这StompSession.Receiptable 仅当RECEIPT 要发送的消息的 STOMP 标头不是null .
您可以启用自动RECEIPT 标头生成StompSession 通过其autoReceipt 选项,并在StompSessionManager 分别。 |
有关如何配置 Spring 集成以接受这些的更多信息,请参见STOMP Adapters Java ConfigurationApplicationEvent
实例。
STOMP 适配器 Java 配置
以下示例显示了 STOMP 适配器的全面 Java 配置:
@Configuration
@EnableIntegration
public class StompConfiguration {
@Bean
public ReactorNettyTcpStompClient stompClient() {
ReactorNettyTcpStompClient stompClient = new ReactorNettyTcpStompClient("127.0.0.1", 61613);
stompClient.setMessageConverter(new PassThruMessageConverter());
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.afterPropertiesSet();
stompClient.setTaskScheduler(taskScheduler);
stompClient.setReceiptTimeLimit(5000);
return stompClient;
}
@Bean
public StompSessionManager stompSessionManager() {
ReactorNettyTcpStompSessionManager stompSessionManager = new ReactorNettyTcpStompSessionManager(stompClient());
stompSessionManager.setAutoReceipt(true);
return stompSessionManager;
}
@Bean
public PollableChannel stompInputChannel() {
return new QueueChannel();
}
@Bean
public StompInboundChannelAdapter stompInboundChannelAdapter() {
StompInboundChannelAdapter adapter =
new StompInboundChannelAdapter(stompSessionManager(), "/topic/myTopic");
adapter.setOutputChannel(stompInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "stompOutputChannel")
public MessageHandler stompMessageHandler() {
StompMessageHandler handler = new StompMessageHandler(stompSessionManager());
handler.setDestination("/topic/myTopic");
return handler;
}
@Bean
public PollableChannel stompEvents() {
return new QueueChannel();
}
@Bean
public ApplicationListener<ApplicationEvent> stompEventListener() {
ApplicationEventListeningMessageProducer producer = new ApplicationEventListeningMessageProducer();
producer.setEventTypes(StompIntegrationEvent.class);
producer.setOutputChannel(stompEvents());
return producer;
}
}
STOMP 命名空间支持
Spring 集成 STOMP 名称空间实现了入站和出站通道适配器组件。 要将其包含在您的配置中,请在您的应用程序上下文配置文件中提供以下命名空间声明:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-stomp="http://www.springframework.org/schema/integration/stomp"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/stomp
https://www.springframework.org/schema/integration/stomp/spring-integration-stomp.xsd">
...
</beans>
了解<int-stomp:outbound-channel-adapter>
元素
下面的清单显示了 STOMP 出站通道适配器的可用属性:
<int-stomp:outbound-channel-adapter
id="" (1)
channel="" (2)
stomp-session-manager="" (3)
header-mapper="" (4)
mapped-headers="" (5)
destination="" (6)
destination-expression="" (7)
auto-startup="" (8)
phase=""/> (9)
1 | 组件 Bean 名称。
这MessageHandler 使用 Bean 别名id 加.handler .
如果未设置channel 属性、DirectChannel 在应用程序上下文中创建并注册,其值为id 属性作为 bean 名称。
在这种情况下,端点是使用 Bean 名称注册的id 加.adapter . |
2 | 标识连接到此适配器的通道,如果id 存在。
看id .
自选。 |
3 | 对StompSessionManager bean,它封装了低级连接和StompSession 处理作。
必填。 |
4 | 对实现HeaderMapper<StompHeaders> 映射 Spring IntegrationMessageHeaders 往返
STOMP 帧标头。
它与mapped-headers .
它默认为StompHeaderMapper . |
5 | 要映射到 STOMP 帧标头的 STOMP 标头名称的逗号分隔列表。
仅当header-mapper reference 未设置。
此列表中的值也可以是与 Headers 名称匹配的简单模式(例如myheader* 或*myheader ).
特殊令牌 (STOMP_OUTBOUND_HEADERS ) 表示所有标准 STOMP 标头(content-length、receipt、heart-beat 等)。
默认情况下,它们被包括在内。
如果要添加自己的标头并希望同时映射标准标头,则必须包含此令牌或提供自己的令牌HeaderMapper 使用header-mapper . |
6 | STOMP 消息发送到的目标的名称。
它与destination-expression . |
7 | 在运行时针对每个 Spring 集成进行评估的 SPEL 表达式Message 作为根对象。
它与destination . |
8 | 指示此端点是否应自动启动的布尔值。
它默认为true . |
9 | 此终端节点应在其中启动和停止的生命周期阶段。
值越低,此终端节点开始得越早,停止得越晚。
默认值为Integer.MIN_VALUE .
值可以是负数。
看SmartLifeCycle . |
了解<int-stomp:inbound-channel-adapter>
元素
下面的清单显示了 STOMP 入站通道适配器的可用属性:
<int-stomp:inbound-channel-adapter
id="" (1)
channel="" (2)
error-channel="" (3)
stomp-session-manager="" (4)
header-mapper="" (5)
mapped-headers="" (6)
destinations="" (7)
send-timeout="" (8)
payload-type="" (9)
auto-startup="" (10)
phase=""/> (11)
1 | 组件 Bean 名称。
如果未设置channel 属性、DirectChannel 在应用程序上下文中创建并注册,其值为id 属性作为 bean 名称。
在这种情况下,端点使用 Bean 名称注册id 加.adapter . |
2 | 标识连接到此适配器的通道。 |
3 | 这MessageChannel bean 引用ErrorMessage 实例。 |
4 | 在<int-stomp:outbound-channel-adapter> . |
5 | 要从 STOMP 帧 Headers 映射的 STOMP Headers 名称的逗号分隔列表。
只有在header-mapper reference 未设置。
此列表中的值也可以是与 Headers 名称匹配的简单模式(例如myheader* 或*myheader ).
特殊令牌 (STOMP_INBOUND_HEADERS ) 表示所有标准 STOMP 标头(content-length、receipt、heart-beat 等)。
默认情况下,它们被包括在内。
如果要添加自己的标头并希望也映射标准标头,则还必须包含此令牌或提供自己的令牌HeaderMapper implementation usingheader-mapper . |
6 | 在<int-stomp:outbound-channel-adapter> . |
7 | 要订阅的 STOMP 目标名称的逗号分隔列表。
目标列表(以及订阅)可以在运行时通过addDestination() 和removeDestination() @ManagedOperation 附注。 |
8 | 如果通道可以阻塞,则向通道发送消息时要等待的最长时间(以毫秒为单位)。
例如,QueueChannel 如果已达到其最大容量,则可以阻止,直到空间可用。 |
9 | 目标的 Java 类型的完全限定名称payload 从传入的 STOMP 帧转换。
它默认为String.class . |
10 | 在<int-stomp:outbound-channel-adapter> . |
11 | 在<int-stomp:outbound-channel-adapter> . |