Spring Integration 提供入站和出站通道适配器,以支持消息队列遥测传输 (MQTT) 协议。Spring中文文档

您需要将此依赖项包含在项目中:Spring中文文档

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
    <version>6.3.1</version>
</dependency>
compile "org.springframework.integration:spring-integration-mqtt:6.3.1"
XML 配置和本章的大部分内容都是关于 MQTT v3.1 协议支持和相应的 Paho 客户端的。 请参阅 MQTT v5 支持段落,了解相应的协议支持。

两个适配器的配置都是使用 . 有关配置选项的更多信息,请参阅 Paho 文档。DefaultMqttPahoClientFactorySpring中文文档

我们建议配置一个对象并将其注入工厂,而不是在工厂本身上设置(已弃用的)选项。MqttConnectOptions
XML 配置和本章的大部分内容都是关于 MQTT v3.1 协议支持和相应的 Paho 客户端的。 请参阅 MQTT v5 支持段落,了解相应的协议支持。
我们建议配置一个对象并将其注入工厂,而不是在工厂本身上设置(已弃用的)选项。MqttConnectOptions

入站(消息驱动)通道适配器

入站通道适配器由 . 为方便起见,可以使用命名空间对其进行配置。 最低配置可能如下所示:MqttPahoMessageDrivenChannelAdapterSpring中文文档

<bean id="clientFactory"
        class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory">
    <property name="connectionOptions">
        <bean class="org.eclipse.paho.client.mqttv3.MqttConnectOptions">
            <property name="userName" value="${mqtt.username}"/>
            <property name="password" value="${mqtt.password}"/>
        </bean>
    </property>
</bean>

<int-mqtt:message-driven-channel-adapter id="mqttInbound"
    client-id="${mqtt.default.client.id}.src"
    url="${mqtt.url}"
    topics="sometopic"
    client-factory="clientFactory"
    channel="output"/>

以下列表显示了可用的属性:Spring中文文档

<int-mqtt:message-driven-channel-adapter id="oneTopicAdapter"
    client-id="foo"  (1)
    url="tcp://localhost:1883"  (2)
    topics="bar,baz"  (3)
    qos="1,2"  (4)
    converter="myConverter"  (5)
    client-factory="clientFactory"  (6)
    send-timeout="123"  (7)
    error-channel="errors"  (8)
    recovery-interval="10000"  (9)
    manual-acks="false" (10)
    channel="out" />
1 客户端 ID。
2 代理 URL。
3 此适配器从中接收消息的主题的逗号分隔列表。
4 以逗号分隔的 QoS 值列表。 它可以是应用于所有主题的单个值,也可以是每个主题的值(在这种情况下,列表的长度必须相同)。
5 一个(可选)。 默认情况下,默认情况下会生成一条消息,其中包含包含以下标头的有效负载:MqttMessageConverterDefaultPahoMessageConverterString
  • mqtt_topic:接收消息的主题Spring中文文档

  • mqtt_duplicate:如果消息是重复的trueSpring中文文档

  • mqtt_qos:服务质量 您可以通过将 定义为 并将属性设置为 来配置为返回有效负载中的原始负载。DefaultPahoMessageConverterbyte[]<bean/>payloadAsBytestrueSpring中文文档

6 客户端工厂。
7 超时。 仅当通道可能阻塞(例如当前已满的有界通道)时,它才适用。send()QueueChannel
8 错误通道。 下游异常将发送到此通道(如果提供),则在 . 有效负载是包含失败消息和原因的负载。ErrorMessageMessagingException
9 恢复间隔。 它控制适配器在发生故障后尝试重新连接的时间间隔。 默认为(十秒)。10000ms
10 确认模式;设置为 true 以进行手动确认。
从版本 4.1 开始,您可以省略 URL。 相反,您可以在 的属性中提供服务器 URI。 例如,这样做可以连接到高可用性 (HA) 群集。serverURIsDefaultMqttPahoClientFactory

从版本 4.2.2 开始,当适配器成功订阅主题时,将发布 an。 当连接或订阅失败时,将发布事件。 这些事件可以由实现 的 Bean 接收。MqttSubscribedEventMqttConnectionFailedEventApplicationListenerSpring中文文档

此外,一个名为 “的新属性用于控制适配器在发生故障后尝试重新连接的时间间隔。 默认为(十秒)。recoveryInterval10000msSpring中文文档

在版本 4.2.3 之前,当适配器停止时,客户机始终取消订阅。 这是不正确的,因为如果客户端 QOS 大于 0,我们需要保持订阅处于活动状态,以便消息到达 当适配器停止时,将在下次启动时交付。 这还需要将客户端工厂上的属性设置为 。 它默认为 。cleanSessionfalsetrueSpring中文文档

从版本 4.2.3 开始,如果属性为 。cleanSessionfalseSpring中文文档

可以通过在工厂设置属性来重写此行为。 它可以有值:、 和 。 后者(默认值)仅在属性为 时取消订阅。consumerCloseActionUNSUBSCRIBE_ALWAYSUNSUBSCRIBE_NEVERUNSUBSCRIBE_CLEANcleanSessiontrueSpring中文文档

要恢复到 4.2.3 之前的行为,请使用 .UNSUBSCRIBE_ALWAYSSpring中文文档

从 V5.0 开始,将 、 和 属性映射到标头 (、 和 ),以避免无意中传播到(默认情况下)使用 、 和 标头的出站消息。topicqosretained.RECEIVED_…​MqttHeaders.RECEIVED_TOPICMqttHeaders.RECEIVED_QOSMqttHeaders.RECEIVED_RETAINEDMqttHeaders.TOPICMqttHeaders.QOSMqttHeaders.RETAINEDSpring中文文档

在运行时添加和删除主题

从 V4.1 开始,可以通过编程方式更改适配器订阅的主题。 Spring Integration 提供了 and 方法。 添加主题时,可以选择指定(默认值:1)。 您还可以通过向具有适当有效负载的适当消息发送适当的消息来修改主题,例如:.addTopic()removeTopic()QoS<control-bus/>"myMqttAdapter.addTopic('foo', 1)"Spring中文文档

停止和启动适配器对主题列表没有影响(它不会恢复到配置中的原始设置)。 这些更改不会保留在应用程序上下文的生命周期之外。 新的应用程序上下文将恢复为配置的设置。Spring中文文档

在适配器停止(或与代理断开连接)时更改主题将在下次建立连接时生效。Spring中文文档

手动确认

从版本 5.3 开始,可以将该属性设置为 true。 通常用于异步确认传递。 当设置为 时,header () 将添加到消息中,其值为 。 您必须调用该方法才能完成交付。 有关更多信息,请参阅 Javadocs。 为方便起见,提供了标头访问器:manualAckstrueIntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACKSimpleAcknowledgmentacknowledge()IMqttClientsetManualAcks()messageArrivedComplete()Spring中文文档

StaticMessageHeaderAccessor.acknowledgment(someMessage).acknowledge();

从 version 开始,当消息转换器抛出异常或从转换中返回时,会向 发送 ,如果提供。 否则将此转换错误重新抛入 MQTT 客户端回调。5.2.11nullMqttMessageMqttPahoMessageDrivenChannelAdapterErrorMessageerrorChannelSpring中文文档

使用 Java 配置进行配置

以下 Spring Boot 应用程序显示了如何使用 Java 配置配置入站适配器的示例:Spring中文文档

@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MqttJavaApplication.class)
                .web(false)
                .run(args);
    }

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", "testClient",
                                                 "topic1", "topic2");
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println(message.getPayload());
            }

        };
    }

}

使用 Java DSL 进行配置

以下 Spring Boot 应用程序提供了使用 Java DSL 配置入站适配器的示例:Spring中文文档

@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MqttJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow mqttInbound() {
        return IntegrationFlow.from(
                         new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883",
                                        "testClient", "topic1", "topic2"))
                .handle(m -> System.out.println(m.getPayload()))
                .get();
    }

}
1 客户端 ID。
2 代理 URL。
3 此适配器从中接收消息的主题的逗号分隔列表。
4 以逗号分隔的 QoS 值列表。 它可以是应用于所有主题的单个值,也可以是每个主题的值(在这种情况下,列表的长度必须相同)。
5 一个(可选)。 默认情况下,默认情况下会生成一条消息,其中包含包含以下标头的有效负载:MqttMessageConverterDefaultPahoMessageConverterString
  • mqtt_topic:接收消息的主题Spring中文文档

  • mqtt_duplicate:如果消息是重复的trueSpring中文文档

  • mqtt_qos:服务质量 您可以通过将 定义为 并将属性设置为 来配置为返回有效负载中的原始负载。DefaultPahoMessageConverterbyte[]<bean/>payloadAsBytestrueSpring中文文档

6 客户端工厂。
7 超时。 仅当通道可能阻塞(例如当前已满的有界通道)时,它才适用。send()QueueChannel
8 错误通道。 下游异常将发送到此通道(如果提供),则在 . 有效负载是包含失败消息和原因的负载。ErrorMessageMessagingException
9 恢复间隔。 它控制适配器在发生故障后尝试重新连接的时间间隔。 默认为(十秒)。10000ms
10 确认模式;设置为 true 以进行手动确认。
从版本 4.1 开始,您可以省略 URL。 相反,您可以在 的属性中提供服务器 URI。 例如,这样做可以连接到高可用性 (HA) 群集。serverURIsDefaultMqttPahoClientFactory

在版本 4.2.3 之前,当适配器停止时,客户机始终取消订阅。 这是不正确的,因为如果客户端 QOS 大于 0,我们需要保持订阅处于活动状态,以便消息到达 当适配器停止时,将在下次启动时交付。 这还需要将客户端工厂上的属性设置为 。 它默认为 。cleanSessionfalsetrueSpring中文文档

从版本 4.2.3 开始,如果属性为 。cleanSessionfalseSpring中文文档

可以通过在工厂设置属性来重写此行为。 它可以有值:、 和 。 后者(默认值)仅在属性为 时取消订阅。consumerCloseActionUNSUBSCRIBE_ALWAYSUNSUBSCRIBE_NEVERUNSUBSCRIBE_CLEANcleanSessiontrueSpring中文文档

要恢复到 4.2.3 之前的行为,请使用 .UNSUBSCRIBE_ALWAYSSpring中文文档

从 V5.0 开始,将 、 和 属性映射到标头 (、 和 ),以避免无意中传播到(默认情况下)使用 、 和 标头的出站消息。topicqosretained.RECEIVED_…​MqttHeaders.RECEIVED_TOPICMqttHeaders.RECEIVED_QOSMqttHeaders.RECEIVED_RETAINEDMqttHeaders.TOPICMqttHeaders.QOSMqttHeaders.RETAINEDSpring中文文档

出站通道适配器

出站通道适配器由 实现,该适配器包装在 . 为方便起见,可以使用命名空间对其进行配置。MqttPahoMessageHandlerConsumerEndpointSpring中文文档

从版本 4.1 开始,适配器支持异步发送操作,避免在确认传递之前阻塞。 如果需要,您可以发出应用程序事件,以使应用程序能够确认传递。Spring中文文档

以下列表显示了可用于出站通道适配器的属性:Spring中文文档

<int-mqtt:outbound-channel-adapter id="withConverter"
    client-id="foo"  (1)
    url="tcp://localhost:1883"  (2)
    converter="myConverter"  (3)
    client-factory="clientFactory"  (4)
    default-qos="1"  (5)
    qos-expression="" (6)
    default-retained="true"  (7)
    retained-expression="" (8)
    default-topic="bar"  (9)
    topic-expression="" (10)
    async="false"  (11)
    async-events="false"  (12)
    channel="target" />
1 客户端 ID。
2 代理 URL。
3 一个(可选)。 默认值可识别以下标头:MqttMessageConverterDefaultPahoMessageConverter
4 客户端工厂。
5 默认服务质量。 如果未找到标头或返回 . 如果您提供自定义 .mqtt_qosqos-expressionnullconverter
6 用于计算以确定 qos 的表达式。 默认值为 。headers[mqtt_qos]
7 保留标志的缺省值。 如果未找到标头,则使用它。 如果提供了自定义,则不使用它。mqtt_retainedconverter
8 要计算以确定保留的布尔值的表达式。 默认值为 。headers[mqtt_retained]
9 将消息发送到的默认主题(如果未找到标头,则使用)。mqtt_topic
10 要计算以确定目标主题的表达式。 默认值为 。headers['mqtt_topic']
11 当 时,调用方不会阻止。 相反,它会在发送消息时等待传递确认。 默认值为 (发送块,直到确认传递)。truefalse
12 当 和 同时是 时,将发出 an (参见事件)。 它包含消息、主题、客户端库生成的消息、 和 (每次连接客户端时递增)。 当客户端库确认传递时,将发出 an。 它包含 、 和 ,使投放能够与 相关联。 任何事件入站通道适配器都可以接收这些事件。 请注意,可以在 . 默认值为 。asyncasync-eventstrueMqttMessageSentEventmessageIdclientIdclientInstanceMqttMessageDeliveredEventmessageIdclientIdclientInstancesend()ApplicationListenerMqttMessageDeliveredEventMqttMessageSentEventfalse
从版本 4.1 开始,可以省略 URL。 相反,可以在 的属性中提供服务器 URI。 例如,这样可以连接到高可用性 (HA) 群集。serverURIsDefaultMqttPahoClientFactory

使用 Java 配置进行配置

以下 Spring Boot 应用程序显示了如何使用 Java 配置配置出站适配器的示例:Spring中文文档

@SpringBootApplication
@IntegrationComponentScan
public class MqttJavaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context =
                new SpringApplicationBuilder(MqttJavaApplication.class)
                        .web(false)
                        .run(args);
        MyGateway gateway = context.getBean(MyGateway.class);
        gateway.sendToMqtt("foo");
    }

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[] { "tcp://host1:1883", "tcp://host2:1883" });
        options.setUserName("username");
        options.setPassword("password".toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =
                       new MqttPahoMessageHandler("testClient", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic("testTopic");
        return messageHandler;
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface MyGateway {

        void sendToMqtt(String data);

    }

}

使用 Java DSL 进行配置

以下 Spring Boot 应用程序提供了使用 Java DSL 配置出站适配器的示例:Spring中文文档

@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MqttJavaApplication.class)
            .web(false)
            .run(args);
    }

   	@Bean
   	public IntegrationFlow mqttOutboundFlow() {
   	    return f -> f.handle(new MqttPahoMessageHandler("tcp://host1:1883", "someMqttClient"));
    }

}
1 客户端 ID。
2 代理 URL。
3 一个(可选)。 默认值可识别以下标头:MqttMessageConverterDefaultPahoMessageConverter
4 客户端工厂。
5 默认服务质量。 如果未找到标头或返回 . 如果您提供自定义 .mqtt_qosqos-expressionnullconverter
6 用于计算以确定 qos 的表达式。 默认值为 。headers[mqtt_qos]
7 保留标志的缺省值。 如果未找到标头,则使用它。 如果提供了自定义,则不使用它。mqtt_retainedconverter
8 要计算以确定保留的布尔值的表达式。 默认值为 。headers[mqtt_retained]
9 将消息发送到的默认主题(如果未找到标头,则使用)。mqtt_topic
10 要计算以确定目标主题的表达式。 默认值为 。headers['mqtt_topic']
11 当 时,调用方不会阻止。 相反,它会在发送消息时等待传递确认。 默认值为 (发送块,直到确认传递)。truefalse
12 当 和 同时是 时,将发出 an (参见事件)。 它包含消息、主题、客户端库生成的消息、 和 (每次连接客户端时递增)。 当客户端库确认传递时,将发出 an。 它包含 、 和 ,使投放能够与 相关联。 任何事件入站通道适配器都可以接收这些事件。 请注意,可以在 . 默认值为 。asyncasync-eventstrueMqttMessageSentEventmessageIdclientIdclientInstanceMqttMessageDeliveredEventmessageIdclientIdclientInstancesend()ApplicationListenerMqttMessageDeliveredEventMqttMessageSentEventfalse
从版本 4.1 开始,可以省略 URL。 相反,可以在 的属性中提供服务器 URI。 例如,这样可以连接到高可用性 (HA) 群集。serverURIsDefaultMqttPahoClientFactory

事件

某些应用程序事件由适配器发布。Spring中文文档

  • MqttConnectionFailedEvent- 如果我们连接失败或随后连接丢失,则由两个适配器发布。 对于 MQTT v5 Paho 客户端,当服务器执行正常断开连接时,也会发出此事件,在这种情况下,丢失的连接为 。causenullSpring中文文档

  • MqttMessageSentEvent- 如果在异步模式下运行,则在发送消息时由出站适配器发布。Spring中文文档

  • MqttMessageDeliveredEvent- 如果在异步模式下运行,则在客户端指示消息已传递时由出站适配器发布。Spring中文文档

  • MqttSubscribedEvent- 由入站适配器在订阅主题后发布。Spring中文文档

这些事件可以由方法接收,也可以由方法接收。ApplicationListener<MqttIntegrationEvent>@EventListenerSpring中文文档

要确定事件的来源,请使用以下命令;您可以检查 Bean 名称和/或连接选项(以访问服务器 URI 等)。Spring中文文档

MqttPahoComponent source = event.getSourceAsType();
String beanName = source.getBeanName();
MqttConnectOptions options = source.getConnectionInfo();

MQTT v5 支持

从版本 5.5.5 开始,该模块为 MQTT v5 协议提供通道适配器实现。 是一个依赖项,因此必须显式包含在目标项目中。spring-integration-mqttorg.eclipse.paho:org.eclipse.paho.mqttv5.clientoptionalSpring中文文档

由于 MQTT v5 协议支持 MQTT 消息中的额外任意属性,因此引入了该实现,以便在发布和接收操作时映射到/从标头映射。 默认情况下,(通过模式)它映射所有接收到的帧属性(包括用户属性)。 在出站端,它映射了帧的标头子集:、。MqttHeaderMapper*PUBLISHPUBLISHcontentTypemqtt_messageExpiryIntervalmqtt_responseTopicmqtt_correlationDataSpring中文文档

MQTT v5 协议的出站通道适配器以 . 它需要一个 MQTT 代理 URL 或引用。 它支持一个选项,在这种情况下可以并且可以发出对象(请参阅选项)。 如果请求消息有效负载是 ,则它将通过内部 按原样发布。 如果有效负载是,则按原样使用目标有效负载以发布。 如果有效负载为 a,则将其转换为发布。 其余的用例被委托给 provided,它是应用程序上下文中的 bean。 注意:当请求的消息有效负载已经是 时,不会使用 provided。 以下 Java DSL 配置示例演示了如何在集成流中使用此通道适配器:Mqttv5PahoMessageHandlerclientIdMqttConnectionOptionsMqttClientPersistenceasyncMqttIntegrationEventasyncEventsorg.eclipse.paho.mqttv5.common.MqttMessageIMqttAsyncClientbyte[]MqttMessageStringbyte[]MessageConverterIntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAMEConfigurableCompositeMessageConverterHeaderMapper<MqttProperties>MqttMessageSpring中文文档

@Bean
public IntegrationFlow mqttOutFlow() {
    Mqttv5PahoMessageHandler messageHandler = new Mqttv5PahoMessageHandler(MQTT_URL, "mqttv5SIout");
    MqttHeaderMapper mqttHeaderMapper = new MqttHeaderMapper();
    mqttHeaderMapper.setOutboundHeaderNames("some_user_header", MessageHeaders.CONTENT_TYPE);
    messageHandler.setHeaderMapper(mqttHeaderMapper);
    messageHandler.setAsync(true);
    messageHandler.setAsyncEvents(true);
    messageHandler.setConverter(mqttStringToBytesConverter());

    return f -> f.handle(messageHandler);
}
不能与 一起使用,因为它的合约仅针对 MQTT v3 协议。org.springframework.integration.mqtt.support.MqttMessageConverterMqttv5PahoMessageHandler

如果连接在启动或运行时失败,则会尝试在生成到此处理程序的下一条消息时重新连接。 如果此手动重新连接失败,则连接异常将返回给调用方。 在这种情况下,将应用标准的Spring Integration错误处理过程,包括请求处理程序建议,例如重试或断路器。Mqttv5PahoMessageHandlerSpring中文文档

在 javadocs 及其超类中查看更多信息。Mqttv5PahoMessageHandlerSpring中文文档

MQTT v5 协议的入站通道适配器以 . 它需要一个 MQTT 代理 URL 或引用,以及要订阅和使用的主题。 它支持一个选项,该选项默认位于内存中。 可以配置预期的(默认情况下),并将其传播到提供的转换中。 如果设置了该选项,则会将标头添加到消息中以作为 的实例生成。 用于将帧属性(包括用户属性)映射到目标消息标头中。 标准属性(如 、 、 、 、 和 接收主题)始终映射到标头。 有关详细信息,请参阅。Mqttv5PahoMessageDrivenChannelAdapterclientIdMqttConnectionOptionsMqttClientPersistencepayloadTypebyte[]SmartMessageConverterbyte[]MqttMessagemanualAckIntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACKSimpleAcknowledgmentHeaderMapper<MqttProperties>PUBLISHMqttMessageqosiddupretainedMqttHeadersSpring中文文档

从版本 6.3 开始,提供基于 for 细粒度配置的构造函数,而不是纯主题名称。 提供这些订阅时,不能使用通道适配器的选项,因为这种模式是 API 的一部分。Mqttv5PahoMessageDrivenChannelAdapterMqttSubscriptionqosqosMqttSubscriptionSpring中文文档

以下 Java DSL 配置示例演示了如何在集成流中使用此通道适配器:Spring中文文档

@Bean
public IntegrationFlow mqttInFlow() {
    Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
        new Mqttv5PahoMessageDrivenChannelAdapter(MQTT_URL, "mqttv5SIin", "siTest");
    messageProducer.setPayloadType(String.class);
    messageProducer.setMessageConverter(mqttStringToBytesConverter());
    messageProducer.setManualAcks(true);

    return IntegrationFlow.from(messageProducer)
            .channel(c -> c.queue("fromMqttChannel"))
            .get();
}
不能与 一起使用,因为它的合约仅针对 MQTT v3 协议。org.springframework.integration.mqtt.support.MqttMessageConverterMqttv5PahoMessageDrivenChannelAdapter

在 javadocs 及其超类中查看更多信息。Mqttv5PahoMessageDrivenChannelAdapterSpring中文文档

建议将 set 设置为 true,以允许内部实例处理重新连接。 否则,只有手动重启才能处理重新连接,例如通过断开连接处理。MqttConnectionOptions#setAutomaticReconnect(boolean)IMqttAsyncClientMqttv5PahoMessageDrivenChannelAdapterMqttConnectionFailedEvent
不能与 一起使用,因为它的合约仅针对 MQTT v3 协议。org.springframework.integration.mqtt.support.MqttMessageConverterMqttv5PahoMessageHandler
不能与 一起使用,因为它的合约仅针对 MQTT v3 协议。org.springframework.integration.mqtt.support.MqttMessageConverterMqttv5PahoMessageDrivenChannelAdapter
建议将 set 设置为 true,以允许内部实例处理重新连接。 否则,只有手动重启才能处理重新连接,例如通过断开连接处理。MqttConnectionOptions#setAutomaticReconnect(boolean)IMqttAsyncClientMqttv5PahoMessageDrivenChannelAdapterMqttConnectionFailedEvent

共享 MQTT 客户端支持

如果多个集成需要单个 MQTT ClientID,则无法使用多个 MQTT 客户端实例,因为 MQTT 代理可能对每个 ClientID 的连接数有限制(通常允许单个连接)。 为了将单个客户端重用于不同的通道适配器,可以使用一个组件并将其传递给所需的任何通道适配器。 它将管理 MQTT 连接生命周期,并在需要时自动重新连接。 此外,还可以向客户端管理器提供自定义连接选项,就像当前可以为通道适配器组件所做的那样。org.springframework.integration.mqtt.core.ClientManagerMqttClientPersistenceSpring中文文档

请注意,MQTT v5 和 v3 通道适配器都受支持。Spring中文文档

以下 Java DSL 配置样本演示了如何在集成流中使用此客户机管理器:Spring中文文档

@Bean
public ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager() {
    MqttConnectionOptions connectionOptions = new MqttConnectionOptions();
    connectionOptions.setServerURIs(new String[]{ "tcp://localhost:1883" });
    connectionOptions.setConnectionTimeout(30000);
    connectionOptions.setMaxReconnectDelay(1000);
    connectionOptions.setAutomaticReconnect(true);
    Mqttv5ClientManager clientManager = new Mqttv5ClientManager(connectionOptions, "client-manager-client-id-v5");
    clientManager.setPersistence(new MqttDefaultFilePersistence());
    return clientManager;
}

@Bean
public IntegrationFlow mqttInFlowTopic1(
        ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {

    Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
        new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, "topic1");
    return IntegrationFlow.from(messageProducer)
            .channel(c -> c.queue("fromMqttChannel"))
            .get();
}

@Bean
public IntegrationFlow mqttInFlowTopic2(
        ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {

    Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
        new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, "topic2");
    return IntegrationFlow.from(messageProducer)
            .channel(c -> c.queue("fromMqttChannel"))
            .get();
}

@Bean
public IntegrationFlow mqttOutFlow(
        ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {

    return f -> f.handle(new Mqttv5PahoMessageHandler(clientManager));
}