此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.4.3! |
动态和运行时集成流
IntegrationFlow
并且其所有依赖组件都可以在运行时注册。
在 5.0 版本之前,我们使用了BeanFactory.registerSingleton()
钩。
从 Spring 框架开始5.0
,我们使用instanceSupplier
程序化的钩子BeanDefinition
注册。
下面的示例展示了如何以编程方式注册一个 Bean:
BeanDefinition beanDefinition =
BeanDefinitionBuilder.genericBeanDefinition((Class<Object>) bean.getClass(), () -> bean)
.getRawBeanDefinition();
((BeanDefinitionRegistry) this.beanFactory).registerBeanDefinition(beanName, beanDefinition);
请注意,在前面的示例中,instanceSupplier
hook 是genericBeanDefinition
方法,在本例中由 Lambda 提供。
所有必要的 bean 初始化和生命周期都是自动完成的,就像使用标准上下文配置 bean 定义一样。
为了简化开发体验,Spring Integration 引入了IntegrationFlowContext
注册和管理IntegrationFlow
实例,如下例所示:
@Autowired
private AbstractServerConnectionFactory server1;
@Autowired
private IntegrationFlowContext flowContext;
...
@Test
public void testTcpGateways() {
TestingUtilities.waitListening(this.server1, null);
IntegrationFlow flow = f -> f
.handle(Tcp.outboundGateway(Tcp.netClient("localhost", this.server1.getPort())
.serializer(TcpCodecs.crlf())
.deserializer(TcpCodecs.lengthHeader1())
.id("client1"))
.remoteTimeout(m -> 5000))
.transform(Transformers.objectToString());
IntegrationFlowRegistration theFlow = this.flowContext.registration(flow).register();
assertThat(theFlow.getMessagingTemplate().convertSendAndReceive("foo", String.class), equalTo("FOO"));
}
当我们有多个配置选项并且必须创建类似流的多个实例时,这非常有用。
为此,我们可以迭代我们的选项并创建和注册IntegrationFlow
实例。
另一种变体是当我们的数据源不是基于 Spring 的,因此我们必须动态创建它。
这样的示例是 Reactive Streams 事件源,如下例所示:
Flux<Message<?>> messageFlux =
Flux.just("1,2,3,4")
.map(v -> v.split(","))
.flatMapIterable(Arrays::asList)
.map(Integer::parseInt)
.map(GenericMessage<Integer>::new);
QueueChannel resultChannel = new QueueChannel();
IntegrationFlow integrationFlow =
IntegrationFlow.from(messageFlux)
.<Integer, Integer>transform(p -> p * 2)
.channel(resultChannel)
.get();
this.integrationFlowContext.registration(integrationFlow)
.register();
这IntegrationFlowRegistrationBuilder
(由于IntegrationFlowContext.registration()
) 可用于指定IntegrationFlow
注册,控制其autoStartup
,并注册非 Spring 集成 bean。
通常,这些额外的 bean 是连接工厂(AMQP、JMS、(S)FTP、TCP/UDP 等)、序列化器和反序列化器或任何其他必需的支持组件。
您可以使用IntegrationFlowRegistration.destroy()
callback 删除动态注册的IntegrationFlow
以及它的所有依赖的 bean。
请参阅IntegrationFlowContext
Javadoc了解更多信息。
从版本 5.0.6 开始,所有生成的 bean 名称都位于IntegrationFlow definition 的前缀为流 ID 作为前缀。
我们建议始终指定显式流 ID。
否则,将在IntegrationFlowContext 生成 Bean 名称,以便为IntegrationFlow 并注册其 bean。
我们在这两个作上进行同步,以避免当相同的生成的 bean 名称可能用于不同的 bean 时出现竞争条件IntegrationFlow 实例。 |
此外,从版本 5.0.6 开始,注册构建器 API 有一个新方法:useFlowIdAsPrefix()
.
如果您希望声明同一流的多个实例,并在流中的组件具有相同的 ID 时避免 bean 名称冲突,这将非常有用,如下例所示:
private void registerFlows() {
IntegrationFlowRegistration flow1 =
this.flowContext.registration(buildFlow(1234))
.id("tcp1")
.useFlowIdAsPrefix()
.register();
IntegrationFlowRegistration flow2 =
this.flowContext.registration(buildFlow(1235))
.id("tcp2")
.useFlowIdAsPrefix()
.register();
}
private IntegrationFlow buildFlow(int port) {
return f -> f
.handle(Tcp.outboundGateway(Tcp.netClient("localhost", port)
.serializer(TcpCodecs.crlf())
.deserializer(TcpCodecs.lengthHeader1())
.id("client"))
.remoteTimeout(m -> 5000))
.transform(Transformers.objectToString());
}
在这种情况下,可以使用名称为tcp1.client.handler
.
一id 属性是必需的useFlowIdAsPrefix() . |