MQTT 支持
MQTT 支持
Spring 集成提供入站和出站通道适配器以支持消息队列遥测传输 (MQTT) 协议。
您需要将此依赖项包含在您的项目中:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>6.0.9</version>
</dependency>
compile "org.springframework.integration:spring-integration-mqtt:6.0.9"
当前实现使用 Eclipse Paho MQTT 客户端库。
XML 配置和本章的大部分内容都是关于 MQTT v3.1 协议支持和相应的 Paho 客户端的。 有关相应的协议支持,请参阅 MQTT v5 支持段落。 |
两个适配器的配置都是使用 实现的。
有关配置选项的更多信息,请参阅 Paho 文档。DefaultMqttPahoClientFactory
我们建议配置一个对象并将其注入工厂,而不是在工厂本身上设置 (已弃用的) 选项。MqttConnectOptions |
入站 (消息驱动) 通道适配器
入站通道适配器由 实现。
为方便起见,您可以使用 namespace 对其进行配置。
最小配置可能如下所示:MqttPahoMessageDrivenChannelAdapter
<bean id="clientFactory"
class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory">
<property name="connectionOptions">
<bean class="org.eclipse.paho.client.mqttv3.MqttConnectOptions">
<property name="userName" value="${mqtt.username}"/>
<property name="password" value="${mqtt.password}"/>
</bean>
</property>
</bean>
<int-mqtt:message-driven-channel-adapter id="mqttInbound"
client-id="${mqtt.default.client.id}.src"
url="${mqtt.url}"
topics="sometopic"
client-factory="clientFactory"
channel="output"/>
以下清单显示了可用的属性:
<int-mqtt:message-driven-channel-adapter id="oneTopicAdapter"
client-id="foo" (1)
url="tcp://localhost:1883" (2)
topics="bar,baz" (3)
qos="1,2" (4)
converter="myConverter" (5)
client-factory="clientFactory" (6)
send-timeout="123" (7)
error-channel="errors" (8)
recovery-interval="10000" (9)
manual-acks="false" (10)
channel="out" />
1 | 客户端 ID。 |
2 | 代理 URL。 |
3 | 此适配器从中接收消息的主题的逗号分隔列表。 |
4 | 以逗号分隔的 QoS 值列表。 它可以是应用于所有主题的单个值,也可以是每个主题的值(在这种情况下,列表的长度必须相同)。 |
5 | 一个 (可选)。
默认情况下,默认生成一条带有以下标头的有效负载的消息:MqttMessageConverter DefaultPahoMessageConverter String
|
6 | 客户端工厂。 |
7 | 超时。
仅当通道可能阻塞(例如当前已满的有界)时,它才适用。send() QueueChannel |
8 | 错误通道。
下游异常将发送到此通道(如果提供),以 .
有效负载是包含失败消息和原因的 a。ErrorMessage MessagingException |
9 | 恢复间隔。
它控制适配器在发生故障后尝试重新连接的时间间隔。
默认为 (10 秒)。10000ms |
10 | 确认模式;设置为 true 以手动确认。 |
从版本 4.1 开始,您可以省略 URL。
相反,您可以在 .
例如,这样做可以连接到高可用性 (HA) 集群。serverURIs DefaultMqttPahoClientFactory |
从版本 4.2.2 开始,当适配器成功订阅主题时,将发布 an。 当连接或订阅失败时,将发布事件。
这些事件可以由实现 .MqttSubscribedEvent
MqttConnectionFailedEvent
ApplicationListener
此外,一个名为 的新属性控制适配器在失败后尝试重新连接的时间间隔。
默认为 (10 秒)。recoveryInterval
10000ms
在版本 4.2.3 之前,当适配器停止时,客户端始终取消订阅。
这是不正确的,因为如果客户端 QOS 大于 0,我们需要保持订阅处于活动状态,以便消息到达
当适配器停止时,将在下次启动时交付。
这还需要将客户端工厂上的属性设置为 。
它默认为 . 从版本 4.2.3 开始,如果属性为 . 可以通过在工厂中设置属性来覆盖此行为。
它可以具有值: , , 和 .
后者(默认)仅在属性为 时取消订阅。 要恢复到 4.2.3 之前的行为,请使用 . |
从版本 5.0 开始, , 和 属性映射到 headers (, , 和 ),以避免无意中传播到(默认情况下)使用 , 和 headers 的出站消息。 |
在运行时添加和删除主题
从版本 4.1 开始,您可以通过编程方式更改适配器订阅的主题。
Spring Integration 提供了 and 方法。
添加主题时,您可以选择指定 (default: 1)。
您还可以通过向具有适当负载的 发送适当的消息来修改主题 - 例如:。addTopic()
removeTopic()
QoS
<control-bus/>
"myMqttAdapter.addTopic('foo', 1)"
停止和启动适配器对主题列表没有影响(它不会恢复到配置中的原始设置)。 这些更改不会在应用程序上下文的生命周期之外保留。 新的应用程序上下文将恢复为配置的设置。
在适配器停止(或与代理断开连接)时更改主题将在下次建立连接时生效。
手动 Acks
从版本 5.3 开始,您可以将该属性设置为 true。
通常用于异步确认投放。
设置为 时,header () 将添加到消息中,其值为 .
您必须调用该方法才能完成投放。
有关更多信息,请参阅 Javadocs。
为方便起见,提供了 header 访问器:manualAcks
true
IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
SimpleAcknowledgment
acknowledge()
IMqttClient
setManualAcks()
messageArrivedComplete()
StaticMessageHeaderAccessor.acknowledgment(someMessage).acknowledge();
从 version 开始,当消息转换器引发异常或从转换中返回时,它会将 an 发送到 , 如果提供。
将此转换错误重新引发到 MQTT 客户端回调中。5.2.11
null
MqttMessage
MqttPahoMessageDrivenChannelAdapter
ErrorMessage
errorChannel
使用 Java 配置进行配置
Spring 下面的 Boot 应用程序显示了如何使用 Java 配置配置入站适配器的示例:
@SpringBootApplication
public class MqttJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(MqttJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", "testClient",
"topic1", "topic2");
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
}
使用 Java DSL 进行配置
以下 Spring Boot 应用程序提供了使用 Java DSL 配置入站适配器的示例:
@SpringBootApplication
public class MqttJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(MqttJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow mqttInbound() {
return IntegrationFlow.from(
new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883",
"testClient", "topic1", "topic2");)
.handle(m -> System.out.println(m.getPayload()))
.get();
}
}
出站通道适配器
出站通道适配器由 实现,它包装在 .
为方便起见,您可以使用 namespace 对其进行配置。MqttPahoMessageHandler
ConsumerEndpoint
从版本 4.1 开始,适配器支持异步发送操作,避免在确认投放之前阻塞。 如果需要,您可以发出应用程序事件以使应用程序能够确认交付。
下面的清单显示了可用于出站通道适配器的属性:
<int-mqtt:outbound-channel-adapter id="withConverter"
client-id="foo" (1)
url="tcp://localhost:1883" (2)
converter="myConverter" (3)
client-factory="clientFactory" (4)
default-qos="1" (5)
qos-expression="" (6)
default-retained="true" (7)
retained-expression="" (8)
default-topic="bar" (9)
topic-expression="" (10)
async="false" (11)
async-events="false" (12)
channel="target" />
1 | 客户端 ID。 |
2 | 代理 URL。 |
3 | 一个 (可选)。
默认值识别以下标头:MqttMessageConverter DefaultPahoMessageConverter
|
4 | 客户端工厂。 |
5 | 默认服务质量。
如果未找到标头或返回 .
如果您提供自定义 .mqtt_qos qos-expression null converter |
6 | 用于计算以确定 qos 的表达式。
默认值为 .headers[mqtt_qos] |
7 | 保留标志的默认值。
如果未找到标头,则使用它。
如果提供了自定义,则不会使用它。mqtt_retained converter |
8 | 用于计算以确定保留布尔值的表达式。
默认值为 .headers[mqtt_retained] |
9 | 将消息发送到的默认主题(如果未找到标头,则使用)。mqtt_topic |
10 | 用于确定目标主题的 VALUE 表达式。
默认值为 .headers['mqtt_topic'] |
11 | 当 时,调用方不会阻止。
相反,它会在发送消息时等待送达确认。
默认值为 (send blocks until delivery is confirm)。true false |
12 | 当 和 都是 时,会发出 an(请参阅 事件 )。
它包含消息、主题、客户端库生成的 、 和 (每次连接客户端时递增)。
当客户端库确认投放时,将发出 an。
它包含 、 和 ,使投放能够与 关联 。
任何事件入站通道适配器都可以接收这些事件。
请注意,可以在 .
默认值为 .async async-events true MqttMessageSentEvent messageId clientId clientInstance MqttMessageDeliveredEvent messageId clientId clientInstance send() ApplicationListener MqttMessageDeliveredEvent MqttMessageSentEvent false |
从版本 4.1 开始,可以省略 URL。
相反,可以在 .
例如,这将启用与高可用性 (HA) 集群的连接。serverURIs DefaultMqttPahoClientFactory |
使用 Java 配置进行配置
以下 Spring Boot 应用程序显示了如何使用 Java 配置配置出站适配器的示例:
@SpringBootApplication
@IntegrationComponentScan
public class MqttJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(MqttJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
gateway.sendToMqtt("foo");
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[] { "tcp://host1:1883", "tcp://host2:1883" });
options.setUserName("username");
options.setPassword("password".toCharArray());
factory.setConnectionOptions(options);
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler("testClient", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic("testTopic");
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MyGateway {
void sendToMqtt(String data);
}
}
使用 Java DSL 进行配置
以下 Spring Boot 应用程序提供了使用 Java DSL 配置出站适配器的示例:
@SpringBootApplication
public class MqttJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(MqttJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow mqttOutboundFlow() {
return f -> f.handle(new MqttPahoMessageHandler("tcp://host1:1883", "someMqttClient"));
}
}
事件
某些应用程序事件由适配器发布。
-
MqttConnectionFailedEvent
- 如果我们连接失败或随后连接丢失,则由两个适配器发布。 对于 MQTT v5 Paho 客户端,当服务器执行正常断开连接时,也会发出此事件,在这种情况下,丢失的连接为 。cause
null
-
MqttMessageSentEvent
- 如果以异步模式运行,则在发送消息时由出站适配器发布。 -
MqttMessageDeliveredEvent
- 如果以异步模式运行,则当客户端指示已传送消息时,由出站适配器发布。 -
MqttSubscribedEvent
- 由入站适配器在订阅主题后发布。
这些事件可以通过 或 方法接收。ApplicationListener<MqttIntegrationEvent>
@EventListener
要确定事件的来源,请使用以下方法;您可以检查 bean 名称和/或 connect 选项(以访问服务器 URI 等)。
MqttPahoComponent source = event.getSourceAsType();
String beanName = source.getBeanName();
MqttConnectOptions options = source.getConnectionInfo();
MQTT v5 支持
从版本 5.5.5 开始,该模块为 MQTT v5 协议提供了通道适配器实现。
它是一个依赖项,因此必须显式包含在目标项目中。spring-integration-mqtt
org.eclipse.paho:org.eclipse.paho.mqttv5.client
optional
由于 MQTT v5 协议支持 MQTT 消息中的额外任意属性,因此引入了在发布和接收操作时映射到/from 标头的实现。
默认情况下,它 (通过 pattern) 映射所有接收到的帧属性 (包括用户属性)。
在出站端,它为帧映射以下标头子集:、、、 .MqttHeaderMapper
*
PUBLISH
PUBLISH
contentType
mqtt_messageExpiryInterval
mqtt_responseTopic
mqtt_correlationData
MQTT v5 协议的出站通道适配器显示为 .
它需要 MQTT 代理 URL 或引用。
它支持一个选项,在这种情况下可以和可以发出对象(参见 option)。
如果请求消息有效负载是 ,则会通过内部 按原样发布。
如果有效负载是,则按原样使用,以便发布目标有效负载。
如果有效负载是 a,则会将其转换为 publish.
其余的用例被委托给提供的,它是来自应用程序上下文的 bean。
注意:当请求的消息负载已经是 .
以下 Java DSL 配置示例演示了如何在集成流中使用此通道适配器:Mqttv5PahoMessageHandler
clientId
MqttConnectionOptions
MqttClientPersistence
async
MqttIntegrationEvent
asyncEvents
org.eclipse.paho.mqttv5.common.MqttMessage
IMqttAsyncClient
byte[]
MqttMessage
String
byte[]
MessageConverter
IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME
ConfigurableCompositeMessageConverter
HeaderMapper<MqttProperties>
MqttMessage
@Bean
public IntegrationFlow mqttOutFlow() {
Mqttv5PahoMessageHandler messageHandler = new Mqttv5PahoMessageHandler(MQTT_URL, "mqttv5SIout");
MqttHeaderMapper mqttHeaderMapper = new MqttHeaderMapper();
mqttHeaderMapper.setOutboundHeaderNames("some_user_header", MessageHeaders.CONTENT_TYPE);
messageHandler.setHeaderMapper(mqttHeaderMapper);
messageHandler.setAsync(true);
messageHandler.setAsyncEvents(true);
messageHandler.setConverter(mqttStringToBytesConverter());
return f -> f.handle(messageHandler);
}
不能与 一起使用,因为它的合约仅针对 MQTT v3 协议。org.springframework.integration.mqtt.support.MqttMessageConverter Mqttv5PahoMessageHandler |
如果连接在启动时或运行时失败,则 将尝试在向此处理程序生成的下一条消息上重新连接。
如果此手动重新连接失败,则连接异常将抛回到调用方。
在这种情况下,应用标准的 Spring 集成错误处理过程,包括请求处理程序建议,例如重试或断路器。Mqttv5PahoMessageHandler
请参阅 javadocs 及其 superclass 中的更多信息。Mqttv5PahoMessageHandler
MQTT v5 协议的入站通道适配器显示为 .
它需要 MQTT 代理 URL 或引用,以及要订阅和使用的主题。
它支持一个选项,默认情况下为 in-memory。
预期(默认情况下)可以配置,并将其传播到收到的 提供的转换 。
如果设置了该选项,则会向消息中添加一个标头,以生成 的实例。
用于将框架属性(包括用户属性)映射到目标消息标头中。
标准属性(如 、 以及接收的主题)始终映射到标头。
有关更多信息,请参阅。Mqttv5PahoMessageDrivenChannelAdapter
clientId
MqttConnectionOptions
MqttClientPersistence
payloadType
byte[]
SmartMessageConverter
byte[]
MqttMessage
manualAck
IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
SimpleAcknowledgment
HeaderMapper<MqttProperties>
PUBLISH
MqttMessage
qos
id
dup
retained
MqttHeaders
以下 Java DSL 配置示例演示了如何在集成流中使用此通道适配器:
@Bean
public IntegrationFlow mqttInFlow() {
Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
new Mqttv5PahoMessageDrivenChannelAdapter(MQTT_URL, "mqttv5SIin", "siTest");
messageProducer.setPayloadType(String.class);
messageProducer.setMessageConverter(mqttStringToBytesConverter());
messageProducer.setManualAcks(true);
return IntegrationFlow.from(messageProducer)
.channel(c -> c.queue("fromMqttChannel"))
.get();
}
不能与 一起使用,因为它的合约仅针对 MQTT v3 协议。org.springframework.integration.mqtt.support.MqttMessageConverter Mqttv5PahoMessageDrivenChannelAdapter |
请参阅 javadocs 及其 superclass 中的更多信息。Mqttv5PahoMessageDrivenChannelAdapter
建议将 true 设置为 true,以允许内部实例处理重新连接。
否则,只有 的手动重启才能处理重新连接,例如通过断开连接时的处理。MqttConnectionOptions#setAutomaticReconnect(boolean) IMqttAsyncClient Mqttv5PahoMessageDrivenChannelAdapter MqttConnectionFailedEvent |
共享 MQTT 客户端支持
如果多个集成需要一个 MQTT ClientID,则不能使用多个 MQTT 客户端实例,因为 MQTT 代理可能对每个 ClientID 的连接数量有限制(通常允许单个连接)。
为了将单个 Client 端重新用于不同的 channel adapters,可以使用组件并将其传递给所需的任何 channel adapter。
它将管理 MQTT 连接生命周期,并在需要时自动重新连接。
此外,还可以将自定义连接选项 提供给客户端管理器,就像当前对通道适配器组件所做的那样。org.springframework.integration.mqtt.core.ClientManager
MqttClientPersistence
请注意,MQTT v5 和 v3 通道适配器均受支持。
以下 Java DSL 配置示例演示了如何在集成流中使用此客户端管理器:
@Bean
public ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager() {
MqttConnectionOptions connectionOptions = new MqttConnectionOptions();
connectionOptions.setServerURIs(new String[]{ "tcp://localhost:1883" });
connectionOptions.setConnectionTimeout(30000);
connectionOptions.setMaxReconnectDelay(1000);
connectionOptions.setAutomaticReconnect(true);
Mqttv5ClientManager clientManager = new Mqttv5ClientManager(connectionOptions, "client-manager-client-id-v5");
clientManager.setPersistence(new MqttDefaultFilePersistence());
return clientManager;
}
@Bean
public IntegrationFlow mqttInFlowTopic1(
ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {
Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, "topic1");
return IntegrationFlow.from(messageProducer)
.channel(c -> c.queue("fromMqttChannel"))
.get();
}
@Bean
public IntegrationFlow mqttInFlowTopic2(
ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {
Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, "topic2");
return IntegrationFlow.from(messageProducer)
.channel(c -> c.queue("fromMqttChannel"))
.get();
}
@Bean
public IntegrationFlow mqttOutFlow(
ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {
return f -> f.handle(new Mqttv5PahoMessageHandler(clientManager));
}