对于最新的稳定版本,请使用 Spring Integration 6.4.3! |
STOMP 支持
Spring 集成版本 4.2 引入了 STOMP(简单文本定向消息传递协议)客户端支持。
它基于 Spring Framework 的消息模块、stomp 包的架构、基础设施和 API。
Spring 集成使用许多 Spring STOMP 组件(例如StompSession
和StompClientSupport
).
有关更多信息,请参阅 Spring Framework 参考手册中的 Spring Framework STOMP Support 一章。
您需要将此依赖项包含在您的项目中:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stomp</artifactId>
<version>6.3.9</version>
</dependency>
compile "org.springframework.integration:spring-integration-stomp:6.3.9"
对于服务器端组件,您需要添加一个org.springframework:spring-websocket
和/或io.projectreactor.netty:reactor-netty
依赖。
概述
要配置 STOMP,您应该从 STOMP 客户端对象开始。 Spring Framework 提供了以下实现:
-
WebSocketStompClient
:基于 Spring WebSocket API 构建,支持标准 JSR-356 WebSocket、Jetty 9 和 Sockjs,用于使用 Sockjs 客户端进行基于 HTTP 的 WebSocket 仿真。 -
ReactorNettyTcpStompClient
:基于ReactorNettyTcpClient
从reactor-netty
项目。
您可以提供任何其他StompClientSupport
实现。
请参阅这些类的 Javadoc。
这StompClientSupport
class 被设计为工厂来生成StompSession
对于提供的StompSessionHandler
其余所有工作都是通过回调完成的StompSessionHandler
和StompSession
抽象化。
使用 Spring 集成适配器抽象,我们需要提供一些托管的共享对象,以将我们的应用程序表示为具有唯一会话的 STOMP 客户端。
为此, Spring 集成提供了StompSessionManager
abstraction 来管理单个 StompSession
之间提供的任何StompSessionHandler
.
这允许对特定的 STOMP Broker 使用入站或出站通道适配器(或两者)。
看StompSessionManager
(及其实现)JavaDocs 了解更多信息。
STOMP 入站通道适配器
这StompInboundChannelAdapter
是一站式的MessageProducer
组件将 Spring 集成应用程序订阅到提供的 STOMP 目标,并从中接收消息(使用提供的MessageConverter
在互联StompSession
).
您可以在运行时使用适当的@ManagedOperation
annotations 上的StompInboundChannelAdapter
.
有关更多配置选项,请参阅 STOMP 命名空间支持和StompInboundChannelAdapter
Javadoc 的 Java 文档。
STOMP 出站通道适配器
这StompMessageHandler
是MessageHandler
对于<int-stomp:outbound-channel-adapter>
,用于发送传出Message<?>
实例添加到 STOMPdestination
(预先配置或在运行时使用 SPEL 表达式确定)通过StompSession
(由共享的StompSessionManager
).
有关更多配置选项,请参阅 STOMP 命名空间支持和StompMessageHandler
Javadoc 的 Java 文档。
STOMP 标头映射
STOMP 协议提供标头作为其帧的一部分。 STOMP 帧的整个结构具有以下格式:
....
COMMAND
header1:value1
header2:value2
Body^@
....
Spring Framework 提供StompHeaders
来表示这些标头。
有关更多详细信息,请参阅 Javadoc。
STOMP 帧相互转换Message<?>
实例,这些标头映射到MessageHeaders
实例。
Spring 集成提供了一个默认的HeaderMapper
STOMP 适配器的实现。
实现方式是StompHeaderMapper
.
它提供fromHeaders()
和toHeaders()
作。
与许多其他 Spring 集成模块一样,IntegrationStompHeaders
类已引入以将标准 STOMP 标头映射到MessageHeaders
跟stomp_
作为标头名称前缀。
此外,所有MessageHeaders
具有该前缀的实例将映射到StompHeaders
发送到接收方时。
STOMP 集成事件
许多 STOMP作是异步的,包括错误处理。
例如,STOMP 有一个RECEIPT
server 帧,当客户端帧请求一个 server 帧时,它通过添加RECEIPT
页眉。
为了提供对这些异步事件的访问, Spring 集成会发出StompIntegrationEvent
实例,您可以通过实现ApplicationListener
或使用<int-event:inbound-channel-adapter>
(参见接收 Spring 应用程序事件)。
具体来说,StompExceptionEvent
从AbstractStompSessionManager
当stompSessionListenableFuture
接收onFailure()
由于无法连接到 STOMP 代理。
另一个示例是StompMessageHandler
.
它处理ERROR
STOMP 帧,这是服务器对此发送的不当(未接受)消息的响应StompMessageHandler
.
这StompMessageHandler
发出StompReceiptEvent
作为StompSession.Receiptable
异步应答中的回调,用于发送到StompSession
.
这StompReceiptEvent
可以是正数还是负数,具体取决于RECEIPT
帧是从receiptTimeLimit
期间,您可以在StompClientSupport
实例。
它默认为15 * 1000
(以毫秒为单位,即 15 秒)。
这StompSession.Receiptable 仅当RECEIPT 要发送的消息的 STOMP 标头不是null .
您可以启用自动RECEIPT 标头生成StompSession 通过其autoReceipt 选项,并在StompSessionManager 分别。 |
有关如何配置 Spring 集成以接受这些的更多信息,请参见STOMP Adapters Java ConfigurationApplicationEvent
实例。
STOMP 适配器 Java 配置
以下示例显示了 STOMP 适配器的全面 Java 配置:
@Configuration
@EnableIntegration
public class StompConfiguration {
@Bean
public ReactorNettyTcpStompClient stompClient() {
ReactorNettyTcpStompClient stompClient = new ReactorNettyTcpStompClient("127.0.0.1", 61613);
stompClient.setMessageConverter(new PassThruMessageConverter());
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.afterPropertiesSet();
stompClient.setTaskScheduler(taskScheduler);
stompClient.setReceiptTimeLimit(5000);
return stompClient;
}
@Bean
public StompSessionManager stompSessionManager() {
ReactorNettyTcpStompSessionManager stompSessionManager = new ReactorNettyTcpStompSessionManager(stompClient());
stompSessionManager.setAutoReceipt(true);
return stompSessionManager;
}
@Bean
public PollableChannel stompInputChannel() {
return new QueueChannel();
}
@Bean
public StompInboundChannelAdapter stompInboundChannelAdapter() {
StompInboundChannelAdapter adapter =
new StompInboundChannelAdapter(stompSessionManager(), "/topic/myTopic");
adapter.setOutputChannel(stompInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "stompOutputChannel")
public MessageHandler stompMessageHandler() {
StompMessageHandler handler = new StompMessageHandler(stompSessionManager());
handler.setDestination("/topic/myTopic");
return handler;
}
@Bean
public PollableChannel stompEvents() {
return new QueueChannel();
}
@Bean
public ApplicationListener<ApplicationEvent> stompEventListener() {
ApplicationEventListeningMessageProducer producer = new ApplicationEventListeningMessageProducer();
producer.setEventTypes(StompIntegrationEvent.class);
producer.setOutputChannel(stompEvents());
return producer;
}
}
STOMP 命名空间支持
Spring 集成 STOMP 名称空间实现了入站和出站通道适配器组件。 要将其包含在您的配置中,请在您的应用程序上下文配置文件中提供以下命名空间声明:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-stomp="http://www.springframework.org/schema/integration/stomp"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/stomp
https://www.springframework.org/schema/integration/stomp/spring-integration-stomp.xsd">
...
</beans>
了解<int-stomp:outbound-channel-adapter>
元素
下面的清单显示了 STOMP 出站通道适配器的可用属性:
<int-stomp:outbound-channel-adapter
id="" (1)
channel="" (2)
stomp-session-manager="" (3)
header-mapper="" (4)
mapped-headers="" (5)
destination="" (6)
destination-expression="" (7)
auto-startup="" (8)
phase=""/> (9)
1 | 组件 Bean 名称。
这MessageHandler 使用 Bean 别名id 加.handler .
如果未设置channel 属性、DirectChannel 在应用程序上下文中创建并注册,其值为id 属性作为 bean 名称。
在这种情况下,端点使用 Bean 名称注册id 加.adapter . |
2 | 标识连接到此适配器的通道,如果id 存在。
看id .
自选。 |
3 | 对StompSessionManager bean,它封装了低级连接和StompSession 处理作。
必填。 |
4 | 对实现HeaderMapper<StompHeaders> 映射 Spring IntegrationMessageHeaders 往返
STOMP 帧标头。
它与mapped-headers .
它默认为StompHeaderMapper . |
5 | 要映射到 STOMP 帧标头的 STOMP 标头名称的逗号分隔列表。
仅当header-mapper reference 未设置。
此列表中的值也可以是与 Headers 名称匹配的简单模式(例如myheader* 或*myheader ).
特殊令牌 (STOMP_OUTBOUND_HEADERS ) 表示所有标准 STOMP 标头(content-length、receipt、heart-beat 等)。
默认情况下,它们被包括在内。
如果要添加自己的标头并希望同时映射标准标头,则必须包含此令牌或提供自己的令牌HeaderMapper 使用header-mapper . |
6 | STOMP 消息发送到的目标的名称。
它与destination-expression . |
7 | 在运行时针对每个 Spring 集成进行评估的 SPEL 表达式Message 作为根对象。
它与destination . |
8 | 指示此端点是否应自动启动的布尔值。
它默认为true . |
9 | 此终端节点应在其中启动和停止的生命周期阶段。
值越低,此终端节点开始得越早,停止得越晚。
默认值为Integer.MIN_VALUE .
值可以是负数。
看SmartLifeCycle . |
了解<int-stomp:inbound-channel-adapter>
元素
下面的清单显示了 STOMP 入站通道适配器的可用属性:
<int-stomp:inbound-channel-adapter
id="" (1)
channel="" (2)
error-channel="" (3)
stomp-session-manager="" (4)
header-mapper="" (5)
mapped-headers="" (6)
destinations="" (7)
send-timeout="" (8)
payload-type="" (9)
auto-startup="" (10)
phase=""/> (11)
1 | 组件 Bean 名称。
如果未设置channel 属性、DirectChannel 在应用程序上下文中创建并注册,其值为id 属性作为 bean 名称。
在这种情况下,端点使用 Bean 名称注册id 加.adapter . |
2 | 标识连接到此适配器的通道。 |
3 | 这MessageChannel bean 引用ErrorMessage 实例。 |
4 | 在<int-stomp:outbound-channel-adapter> . |
5 | 要从 STOMP 帧 Headers 映射的 STOMP Headers 名称的逗号分隔列表。
只有在header-mapper reference 未设置。
此列表中的值也可以是与 Headers 名称匹配的简单模式(例如myheader* 或*myheader ).
特殊令牌 (STOMP_INBOUND_HEADERS ) 表示所有标准 STOMP 标头(content-length、receipt、heart-beat 等)。
默认情况下,它们被包括在内。
如果要添加自己的标头并希望也映射标准标头,则还必须包含此令牌或提供自己的令牌HeaderMapper implementation usingheader-mapper . |
6 | 在<int-stomp:outbound-channel-adapter> . |
7 | 要订阅的 STOMP 目标名称的逗号分隔列表。
目标列表(以及订阅)可以在运行时通过addDestination() 和removeDestination() @ManagedOperation 附注。 |
8 | 如果通道可以阻塞,则向通道发送消息时要等待的最长时间(以毫秒为单位)。
例如,QueueChannel 如果已达到其最大容量,则可以阻止,直到空间可用。 |
9 | 目标的 Java 类型的完全限定名称payload 从传入的 STOMP 帧转换。
它默认为String.class . |
10 | 在<int-stomp:outbound-channel-adapter> . |
11 | 在<int-stomp:outbound-channel-adapter> . |