Spring Integration 版本 4.2 引入了 STOMP (Simple Text Oriented Messaging Protocol) 客户端支持。 它基于 Spring Framework 的消息传递模块 stomp 包中的架构、基础结构和 API。 Spring Integration 使用许多 Spring STOMP 组件(如 和 )。 有关更多信息,请参见 Spring Framework 参考手册中的 Spring Framework STOMP 支持一章。StompSessionStompClientSupportSpring中文文档

您需要将此依赖项包含在项目中:Spring中文文档

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stomp</artifactId>
    <version>6.3.1</version>
</dependency>
compile "org.springframework.integration:spring-integration-stomp:6.3.1"

对于服务器端组件,您需要添加和/或依赖项。org.springframework:spring-websocketio.projectreactor.netty:reactor-nettySpring中文文档

概述

要配置 STOMP,应从 STOMP 客户端对象开始。 Spring Framework 提供以下实现:Spring中文文档

  • WebSocketStompClient:基于 Spring WebSocket API 构建,支持标准 JSR-356 WebSocket、Jetty 9 和 SockJS,用于使用 SockJS 客户端进行基于 HTTP 的 WebSocket 仿真。Spring中文文档

  • ReactorNettyTcpStompClient:基于项目构建。ReactorNettyTcpClientreactor-nettySpring中文文档

您可以提供任何其他实现。 请参阅这些类的 JavadocStompClientSupportSpring中文文档

该类被设计为一个工厂,为提供的对象生成一个,所有剩余的工作都是通过对该类的回调和抽象来完成的。 使用 Spring Integration 适配器抽象,我们需要提供一些托管共享对象来表示我们的应用程序作为具有唯一会话的 STOMP 客户端。 为此,Spring Integration 提供了抽象来管理任何提供的单个 . 这允许对特定 STOMP 代理使用入站出站通道适配器(或两者兼而有之)。 有关更多信息,请参阅(及其实现)JavaDocs。StompClientSupportStompSessionStompSessionHandlerStompSessionHandlerStompSessionStompSessionManagerStompSessionStompSessionHandlerStompSessionManagerSpring中文文档

STOMP 入站通道适配器

这是一个一站式组件,它将您的 Spring Integration 应用程序订阅到提供的 STOMP 目标并从它们接收消息(通过使用连接上提供的 从 STOMP 帧转换)。 您可以在运行时更改目标(从而更改 STOMP 订阅),方法是在 .StompInboundChannelAdapterMessageProducerMessageConverterStompSession@ManagedOperationStompInboundChannelAdapterSpring中文文档

有关更多配置选项,请参阅 STOMP 命名空间支持JavadocStompInboundChannelAdapterSpring中文文档

STOMP 出站通道适配器

的 是 和 用于通过 (由共享提供)将传出实例发送到 STOMP(在运行时使用 SpEL 表达式预先配置或确定)。StompMessageHandlerMessageHandler<int-stomp:outbound-channel-adapter>Message<?>destinationStompSessionStompSessionManagerSpring中文文档

有关更多配置选项,请参阅 STOMP 命名空间支持JavadocStompMessageHandlerSpring中文文档

STOMP 标头映射

STOMP 协议提供标头作为其框架的一部分。 STOMP 框架的整个结构具有以下格式:Spring中文文档

....
COMMAND
header1:value1
header2:value2

Body^@
....

Spring Framework 提供表示这些标头。 有关更多详细信息,请参阅 Javadoc。 STOMP 帧与实例之间相互转换,这些标头与实例之间相互映射。 Spring Integration 为 STOMP 适配器提供了默认实现。 实现为 。 它分别为入站适配器和出站适配器提供操作。StompHeadersMessage<?>MessageHeadersHeaderMapperStompHeaderMapperfromHeaders()toHeaders()Spring中文文档

与许多其他 Spring Integration 模块一样,引入了该类以将标准 STOMP 标头映射到 ,并作为标头名称前缀。 此外,所有具有该前缀的实例都映射到 发送到目标时。IntegrationStompHeadersMessageHeadersstomp_MessageHeadersStompHeadersSpring中文文档

有关更多信息,请参阅这些类的 JavadocSTOMP 命名空间支持中的属性描述。mapped-headersSpring中文文档

STOMP 集成事件

许多 STOMP 操作都是异步的,包括错误处理。 例如,STOMP 有一个服务器帧,当客户端帧通过添加标头请求服务器帧时,它会返回该服务器帧。 为了提供对这些异步事件的访问,Spring Integration 会发出实例,您可以通过实现 或 使用 来获取这些实例(请参阅接收 Spring 应用程序事件)。RECEIPTRECEIPTStompIntegrationEventApplicationListener<int-event:inbound-channel-adapter>Spring中文文档

具体来说,当 a 由于无法连接到 STOMP 代理而接收时,a 是从发出的。 另一个例子是 . 它处理 STOMP 帧,这些帧是服务器对此发送的不正确(未接受)消息的响应。StompExceptionEventAbstractStompSessionManagerstompSessionListenableFutureonFailure()StompMessageHandlerERRORStompMessageHandlerSpring中文文档

在发送到 的邮件的异步应答中作为回调的一部分发出。 可以是正数或负数,具体取决于帧是否在该时间段内从服务器接收,您可以在实例上配置该时间段。 默认为(以毫秒为单位,因此为 15 秒)。StompMessageHandlerStompReceiptEventStompSession.ReceiptableStompSessionStompReceiptEventRECEIPTreceiptTimeLimitStompClientSupport15 * 1000Spring中文文档

仅当要发送的消息的 STOMP 标头不是 时,才会添加回调。 您可以在 through its 选项和 上分别启用自动标头生成。StompSession.ReceiptableRECEIPTnullRECEIPTStompSessionautoReceiptStompSessionManager

有关如何配置 Spring Integration 以接受这些实例的更多信息,请参阅 STOMP 适配器 Java 配置ApplicationEventSpring中文文档

仅当要发送的消息的 STOMP 标头不是 时,才会添加回调。 您可以在 through its 选项和 上分别启用自动标头生成。StompSession.ReceiptableRECEIPTnullRECEIPTStompSessionautoReceiptStompSessionManager

STOMP 适配器 Java 配置

以下示例显示了 STOMP 适配器的综合 Java 配置:Spring中文文档

@Configuration
@EnableIntegration
public class StompConfiguration {

    @Bean
    public ReactorNettyTcpStompClient stompClient() {
        ReactorNettyTcpStompClient stompClient = new ReactorNettyTcpStompClient("127.0.0.1", 61613);
        stompClient.setMessageConverter(new PassThruMessageConverter());
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.afterPropertiesSet();
        stompClient.setTaskScheduler(taskScheduler);
        stompClient.setReceiptTimeLimit(5000);
        return stompClient;
    }

    @Bean
    public StompSessionManager stompSessionManager() {
        ReactorNettyTcpStompSessionManager stompSessionManager = new ReactorNettyTcpStompSessionManager(stompClient());
        stompSessionManager.setAutoReceipt(true);
        return stompSessionManager;
    }

    @Bean
    public PollableChannel stompInputChannel() {
        return new QueueChannel();
    }

    @Bean
    public StompInboundChannelAdapter stompInboundChannelAdapter() {
        StompInboundChannelAdapter adapter =
        		new StompInboundChannelAdapter(stompSessionManager(), "/topic/myTopic");
        adapter.setOutputChannel(stompInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "stompOutputChannel")
    public MessageHandler stompMessageHandler() {
        StompMessageHandler handler = new StompMessageHandler(stompSessionManager());
        handler.setDestination("/topic/myTopic");
        return handler;
    }

    @Bean
    public PollableChannel stompEvents() {
        return new QueueChannel();
    }

    @Bean
    public ApplicationListener<ApplicationEvent> stompEventListener() {
        ApplicationEventListeningMessageProducer producer = new ApplicationEventListeningMessageProducer();
        producer.setEventTypes(StompIntegrationEvent.class);
        producer.setOutputChannel(stompEvents());
        return producer;
    }

}

STOMP 命名空间支持

Spring Integration STOMP 命名空间实现入站和出站通道适配器组件。 若要将其包含在配置中,请在应用程序上下文配置文件中提供以下命名空间声明:Spring中文文档

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-stomp="http://www.springframework.org/schema/integration/stomp"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/stomp
    https://www.springframework.org/schema/integration/stomp/spring-integration-stomp.xsd">
    ...
</beans>

了解元素<int-stomp:outbound-channel-adapter>

以下清单显示了 STOMP 出站通道适配器的可用属性:Spring中文文档

<int-stomp:outbound-channel-adapter
                           id=""                      (1)
                           channel=""                 (2)
                           stomp-session-manager=""   (3)
                           header-mapper=""           (4)
                           mapped-headers=""          (5)
                           destination=""             (6)
                           destination-expression=""  (7)
                           auto-startup=""            (8)
                           phase=""/>                 (9)
1 组件 Bean 名称。 注册的 Bean 别名为 plus。 如果未设置该属性,那么将在应用程序上下文中创建并注册 a,并将此属性的值作为 Bean 名称。 在本例中,端点是使用 Bean 名称 plus 注册的。MessageHandlerid.handlerchannelDirectChannelidid.adapter
2 标识连接到此适配器的通道(如果存在)。 看。 自选。idid
3 对 Bean 的引用,它封装了低级连接和处理操作。 必填。StompSessionManagerStompSession
4 对实现 的 bean 的引用,该 bean 将 Spring Integration 映射到和从 STOMP 帧标头。 它与 . 它默认为 。HeaderMapper<StompHeaders>MessageHeadersmapped-headersStompHeaderMapper
5 要映射到 STOMP 帧标头的 STOMP 标头的名称名称的逗号分隔列表。 仅当未设置引用时,才能提供它。 此列表中的值也可以是与标头名称(如 或 )匹配的简单模式。 特殊标记 () 表示所有标准 STOMP 标头(内容长度、收据、心跳等)。 默认情况下,它们包含在内。 如果要添加自己的标头并希望同时映射标准标头,则必须包含此标记或使用 提供自己的实现。header-mappermyheader**myheaderSTOMP_OUTBOUND_HEADERSHeaderMapperheader-mapper
6 将 STOMP 消息发送到的目标的名称。 它与 .destination-expression
7 一个 SpEL 表达式,要在运行时针对每个 Spring Integration 作为根对象进行评估。 它与 .Messagedestination
8 指示此终结点是否应自动启动的布尔值。 它默认为 。true
9 此终结点应启动和停止的生命周期阶段。 该值越低,此终结点开始得越早,停止得越晚。 默认值为 。 值可以是负数。 请参阅 SmartLifeCycleInteger.MIN_VALUE

了解元素<int-stomp:inbound-channel-adapter>

以下列表显示了 STOMP 入站通道适配器的可用属性:Spring中文文档

<int-stomp:inbound-channel-adapter
                           id=""                     (1)
                           channel=""                (2)
                           error-channel=""          (3)
                           stomp-session-manager=""  (4)
                           header-mapper=""          (5)
                           mapped-headers=""         (6)
                           destinations=""           (7)
                           send-timeout=""           (8)
                           payload-type=""           (9)
                           auto-startup=""           (10)
                           phase=""/>                (11)
1 组件 Bean 名称。 如果未设置该属性,那么将在应用程序上下文中创建并注册 a,并将此属性的值作为 Bean 名称。 在本例中,终端节点是使用 Bean name plus 注册的。channelDirectChannelidid.adapter
2 标识连接到此适配器的通道。
3 应发送实例的 Bean 引用。MessageChannelErrorMessage
4 请参阅 <int-stomp:outbound-channel-adapter> 上的相同选项。
5 要从 STOMP 帧标头映射的 STOMP 标头的名称列表以逗号分隔。 只有在未设置引用时,才能提供此信息。 此列表中的值也可以是与标头名称匹配的简单模式(例如,或 )。 特殊标记 () 表示所有标准 STOMP 标头(内容长度、收据、心跳等)。 默认情况下,它们包含在内。 如果要添加自己的标头并希望也映射标准标头,则还必须包含此令牌或使用 提供自己的实现。header-mappermyheader**myheaderSTOMP_INBOUND_HEADERSHeaderMapperheader-mapper
6 请参阅 <int-stomp:outbound-channel-adapter> 上的相同选项。
7 要订阅的 STOMP 目标名称的逗号分隔列表。 可以在运行时通过 和 注释修改目标列表(以及订阅)。addDestination()removeDestination()@ManagedOperation
8 如果通道可以阻塞,则向通道发送消息时等待的最长时间(以毫秒为单位)。 例如,如果已达到最大容量,则可以阻止,直到空间可用。QueueChannel
9 要从传入 STOMP 帧转换的目标的 Java 类型的完全限定名称。 它默认为 。payloadString.class
10 请参阅 <int-stomp:outbound-channel-adapter> 上的相同选项。
11 请参阅 <int-stomp:outbound-channel-adapter> 上的相同选项。
1 组件 Bean 名称。 注册的 Bean 别名为 plus。 如果未设置该属性,那么将在应用程序上下文中创建并注册 a,并将此属性的值作为 Bean 名称。 在本例中,端点是使用 Bean 名称 plus 注册的。MessageHandlerid.handlerchannelDirectChannelidid.adapter
2 标识连接到此适配器的通道(如果存在)。 看。 自选。idid
3 对 Bean 的引用,它封装了低级连接和处理操作。 必填。StompSessionManagerStompSession
4 对实现 的 bean 的引用,该 bean 将 Spring Integration 映射到和从 STOMP 帧标头。 它与 . 它默认为 。HeaderMapper<StompHeaders>MessageHeadersmapped-headersStompHeaderMapper
5 要映射到 STOMP 帧标头的 STOMP 标头的名称名称的逗号分隔列表。 仅当未设置引用时,才能提供它。 此列表中的值也可以是与标头名称(如 或 )匹配的简单模式。 特殊标记 () 表示所有标准 STOMP 标头(内容长度、收据、心跳等)。 默认情况下,它们包含在内。 如果要添加自己的标头并希望同时映射标准标头,则必须包含此标记或使用 提供自己的实现。header-mappermyheader**myheaderSTOMP_OUTBOUND_HEADERSHeaderMapperheader-mapper
6 将 STOMP 消息发送到的目标的名称。 它与 .destination-expression
7 一个 SpEL 表达式,要在运行时针对每个 Spring Integration 作为根对象进行评估。 它与 .Messagedestination
8 指示此终结点是否应自动启动的布尔值。 它默认为 。true
9 此终结点应启动和停止的生命周期阶段。 该值越低,此终结点开始得越早,停止得越晚。 默认值为 。 值可以是负数。 请参阅 SmartLifeCycleInteger.MIN_VALUE
1 组件 Bean 名称。 如果未设置该属性,那么将在应用程序上下文中创建并注册 a,并将此属性的值作为 Bean 名称。 在本例中,终端节点是使用 Bean name plus 注册的。channelDirectChannelidid.adapter
2 标识连接到此适配器的通道。
3 应发送实例的 Bean 引用。MessageChannelErrorMessage
4 请参阅 <int-stomp:outbound-channel-adapter> 上的相同选项。
5 要从 STOMP 帧标头映射的 STOMP 标头的名称列表以逗号分隔。 只有在未设置引用时,才能提供此信息。 此列表中的值也可以是与标头名称匹配的简单模式(例如,或 )。 特殊标记 () 表示所有标准 STOMP 标头(内容长度、收据、心跳等)。 默认情况下,它们包含在内。 如果要添加自己的标头并希望也映射标准标头,则还必须包含此令牌或使用 提供自己的实现。header-mappermyheader**myheaderSTOMP_INBOUND_HEADERSHeaderMapperheader-mapper
6 请参阅 <int-stomp:outbound-channel-adapter> 上的相同选项。
7 要订阅的 STOMP 目标名称的逗号分隔列表。 可以在运行时通过 和 注释修改目标列表(以及订阅)。addDestination()removeDestination()@ManagedOperation
8 如果通道可以阻塞,则向通道发送消息时等待的最长时间(以毫秒为单位)。 例如,如果已达到最大容量,则可以阻止,直到空间可用。QueueChannel
9 要从传入 STOMP 帧转换的目标的 Java 类型的完全限定名称。 它默认为 。payloadString.class
10 请参阅 <int-stomp:outbound-channel-adapter> 上的相同选项。
11 请参阅 <int-stomp:outbound-channel-adapter> 上的相同选项。