此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.4.0! |
ZeroMQ 支持
Spring 集成提供了支持应用程序中 ZeroMQ 通信的组件。 该实现基于 JeroMQ 库的 Java API。 所有组件都封装了 ZeroMQ 套接字生命周期,并在内部为它们管理线程,使与这些组件的交互无锁且线程安全。
您需要将此依赖项包含在您的项目中:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-zeromq</artifactId>
<version>6.4.1-SNAPSHOT</version>
</dependency>
compile "org.springframework.integration:spring-integration-zeromq:6.4.1-SNAPSHOT"
ZeroMQ 代理
这是内置函数的 Spring 友好包装器。
它封装了套接字生命周期和线程管理。
此代理的客户端仍然可以使用标准的 ZeroMQ 套接字连接和交互 API。
除了标准之外,它还需要一种著名的 ZeroMQ 代理模式:SUB/PUB、PULL/PUSH 或 ROUTER/DEALER。
这样,一对适当的 ZeroMQ 套接字类型被用于代理的前端和后端。
有关详细信息,请参阅。ZeroMqProxy
ZMQ.proxy()
ZContext
ZeroMqProxy.Type
用于创建、绑定和配置套接字以及从专用线程启动(如果有)的实现。
前端和后端套接字的绑定是通过协议在具有提供端口的所有可用网络接口上完成的。
否则,它们将绑定到 random 端口,稍后可以通过相应的 和 API 方法获取这些端口。ZeroMqProxy
SmartLifecycle
ZMQ.proxy()
Executor
tcp://
getFrontendPort()
getBackendPort()
控制套接字在地址上公开为具有线程间传输的 a;它可以通过以下方式获得。
它应该与来自另一个套接字的同一应用程序一起使用,以发送和/或命令。
当为其生命周期调用时,它会执行命令以终止循环并正常关闭所有绑定的套接字。SocketType.PAIR
"inproc://" + beanName + ".control"
getControlAddress()
SocketType.PAIR
ZMQ.PROXY_TERMINATE
ZMQ.PROXY_PAUSE
ZMQ.PROXY_RESUME
ZeroMqProxy
ZMQ.PROXY_TERMINATE
stop()
ZMQ.proxy()
该选项使此组件绑定一个额外的线程间套接字,以捕获和发布前端和后端套接字之间的所有通信,就像它在实现中所说的那样。
此套接字绑定到地址,不需要任何特定的订阅进行筛选。setExposeCaptureSocket(boolean)
SocketType.PUB
ZMQ.proxy()
"inproc://" + beanName + ".capture"
前端和后端套接字可以使用其他属性进行自定义,例如读/写超时或安全性。
此自定义分别通过 和 callbacks 提供。setFrontendSocketConfigurer(Consumer<ZMQ.Socket>)
setBackendSocketConfigurer(Consumer<ZMQ.Socket>)
可以像这样提供简单的 bean:ZeroMqProxy
@Bean
ZeroMqProxy zeroMqProxy() {
ZeroMqProxy proxy = new ZeroMqProxy(CONTEXT, ZeroMqProxy.Type.SUB_PUB);
proxy.setExposeCaptureSocket(true);
proxy.setFrontendPort(6001);
proxy.setBackendPort(6002);
return proxy;
}
所有客户端节点都应通过以下方式连接到此代理的主机,并使用他们感兴趣的相应端口。tcp://
ZeroMQ 消息频道
该 a 使用一对 ZeroMQ 套接字连接发布者和订阅者以进行消息传递交互。
它可以在 PUB/SUB 模式下工作(默认为 PUSH/PULL);它也可以用作本地线程间通道(使用 sockets)——在这种情况下不提供。
在分布式模式下,它必须连接到外部管理的 ZeroMQ 代理,在那里它可以与连接到同一代理的其他类似通道交换消息。
connection url 选项是一个标准的 ZeroMQ 连接字符串,其中包含协议和主机,以及用于 ZeroMQ 代理的前端和后端套接字的一对冒号端口。
为方便起见,如果通道在与代理相同的应用程序中配置,则可以为通道提供实例而不是连接字符串。ZeroMqChannel
SubscribableChannel
PAIR
connectUrl
ZeroMqProxy
发送和接收套接字都在各自的专用线程中进行管理,使该通道对并发友好。
这样,我们可以在不同步的情况下从不同的线程发布和使用。ZeroMqChannel
默认情况下,使用 an 使用 Jackson JSON 处理器从 / to 序列化(包括标头)。
此逻辑可通过 进行配置。ZeroMqChannel
EmbeddedJsonHeadersMessageMapper
Message
byte[]
setMessageMapper(BytesMessageMapper)
发送和接收套接字可以通过相应的回调为任何选项(读/写超时、安全性等)进行自定义。setSendSocketConfigurer(Consumer<ZMQ.Socket>)
setSubscribeSocketConfigurer(Consumer<ZMQ.Socket>)
的内部逻辑基于通过 Project Reactor 和运算符的反应式流。
这提供了更轻松的线程控制,并允许无锁的并发发布和对通道的使用。
本地 PUB/SUB 逻辑作为运算符实现,以允许此通道的所有本地订户接收与套接字的分布式订户相同的已发布消息。ZeroMqChannel
Flux
Mono
Flux.publish()
PUB
以下是配置的简单示例:ZeroMqChannel
@Bean
ZeroMqChannel zeroMqPubSubChannel(ZContext context) {
ZeroMqChannel channel = new ZeroMqChannel(context, true);
channel.setConnectUrl("tcp://localhost:6001:6002");
channel.setConsumeDelay(Duration.ofMillis(100));
return channel;
}
ZeroMQ 入站通道适配器
这是一个具有反应式语义的实现。
它以非阻塞的方式不断从 ZeroMQ 套接字读取数据,并将消息发布到一个无限,如果输出通道不是反应性的,则将其发布到 a 订阅的无限或方法中显式订阅。
当套接字上未收到任何数据时,将在下一次读取尝试之前应用 a (默认为 1 秒)。ZeroMqMessageProducer
MessageProducerSupport
Flux
FluxMessageChannel
start()
consumeDelay
只有 ,受 和 的支持。
此组件可以连接到远程套接字或使用提供的或随机端口绑定到 TCP 协议。
实际的端口可以通过启动该组件并绑定 ZeroMQ 套接字后获取。
套接字选项(例如 security 或 write timeout)可以通过回调进行配置。SocketType.PAIR
SocketType.PULL
SocketType.SUB
ZeroMqMessageProducer
getBoundPort()
setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer)
如果选项设置为 , 则从套接字消耗的 ,将按原样在生成的 : 的有效负载中发送,则由下游流来解析和转换 .
否则,将使用 an 将消耗的数据转换为 .
如果收到的是多帧,则第一帧被视为此 ZeroMQ 消息发布到的 Header。receiveRaw
true
ZMsg
Message
ZMsg
InboundMessageMapper
Message
ZMsg
ZeroMqHeaders.TOPIC
如果该选项设置为 ,则传入消息被视为由两个帧组成:主题和 ZeroMQ 消息。
否则,默认情况下, the 被视为由三个帧组成:第一个帧包含主题,最后一个帧包含消息,中间有一个空帧。unwrapTopic
false
ZMsg
使用 ,将使用为订阅提供的选项;默认为 subscribe to all。
可以在运行时使用 和 s 调整订阅。SocketType.SUB
ZeroMqMessageProducer
topics
subscribeToTopics()
unsubscribeFromTopics()
@ManagedOperation
下面是一个配置示例:ZeroMqMessageProducer
@Bean
ZeroMqMessageProducer zeroMqMessageProducer(ZContext context, MessageChannel outputChannel) {
ZeroMqMessageProducer messageProducer = new ZeroMqMessageProducer(context, SocketType.SUB);
messageProducer.setOutputChannel(outputChannel);
messageProducer.setTopics("some");
messageProducer.setReceiveRaw(true);
messageProducer.setBindPort(7070);
messageProducer.setConsumeDelay(Duration.ofMillis(100));
return messageProducer;
}
ZeroMQ 出站通道适配器
这是一种将发布消息生成到 ZeroMQ 套接字中的实现。
仅 和 受支持。
此组件可以连接到远程套接字或使用提供的或随机端口绑定到 TCP 协议。
实际的端口可以通过启动该组件并绑定 ZeroMQ 套接字后获取。ZeroMqMessageHandler
ReactiveMessageHandler
SocketType.PAIR
SocketType.PUSH
SocketType.PUB
getBoundPort()
使用 the 时,将根据请求消息进行评估,以将主题帧注入 ZeroMQ 消息(如果它不为 null)。
订阅者端 () 必须先收到主题帧,然后才能解析实际数据。SocketType.PUB
topicExpression
SocketType.SUB
如果该选项设置为 ,则 ZeroMQ 消息帧将在注入的主题(如果存在)之后发送。
默认情况下,在 topic 和消息之间会发送一个额外的空帧。wrapTopic
false
当请求消息的有效负载为 a 时,不会执行任何转换或主题提取:该 将按原样发送到套接字中,并且不会销毁以供进一步重用。
否则,an 用于将请求消息(或仅其有效负载)转换为 ZeroMQ 帧进行发布。
默认情况下,使用 a 与 .
套接字选项(例如 security 或 write timeout)可以通过回调进行配置。ZMsg
ZMsg
OutboundMessageMapper<byte[]>
ConvertingBytesMessageMapper
ConfigurableCompositeMessageConverter
setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer)
以下是连接到 socket 的配置示例:ZeroMqMessageHandler
@Bean
@ServiceActivator(inputChannel = "zeroMqPublisherChannel")
ZeroMqMessageHandler zeroMqMessageHandler(ZContext context) {
ZeroMqMessageHandler messageHandler =
new ZeroMqMessageHandler(context, "tcp://localhost:6060", SocketType.PUB);
messageHandler.setTopicExpression(
new FunctionExpression<Message<?>>((message) -> message.getHeaders().get("topic")));
messageHandler.setMessageMapper(new EmbeddedJsonHeadersMessageMapper());
}
以下是绑定到提供的端口的配置示例:ZeroMqMessageHandler
@Bean
@ServiceActivator(inputChannel = "zeroMqPublisherChannel")
ZeroMqMessageHandler zeroMqMessageHandler(ZContext context) {
ZeroMqMessageHandler messageHandler =
new ZeroMqMessageHandler(context, 7070, SocketType.PUB);
messageHandler.setTopicExpression(
new FunctionExpression<Message<?>>((message) -> message.getHeaders().get("topic")));
messageHandler.setMessageMapper(new EmbeddedJsonHeadersMessageMapper());
}
ZeroMQ Java DSL 支持
它们通过工厂为上述组件提供了方便的 Java DSL Fluent API。spring-integration-zeromq
ZeroMq
IntegrationComponentSpec
以下是 Java DSL 的一个示例:ZeroMqChannel
.channel(ZeroMq.zeroMqChannel(this.context)
.connectUrl("tcp://localhost:6001:6002")
.consumeDelay(Duration.ofMillis(100)))
}
ZeroMQ Java DSL 的入站通道适配器是:
IntegrationFlow.from(
ZeroMq.inboundChannelAdapter(this.context, SocketType.SUB)
.connectUrl("tcp://localhost:9000")
.topics("someTopic")
.receiveRaw(true)
.consumeDelay(Duration.ofMillis(100)))
}
ZeroMQ Java DSL 的出站通道适配器是:
.handle(ZeroMq.outboundChannelAdapter(this.context, "tcp://localhost:9001", SocketType.PUB)
.topicFunction(message -> message.getHeaders().get("myTopic")))
}