此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 spring-cloud-function 4.1.4! |
Spring 集成交互
Spring Integration Framework 扩展了 Spring 编程模型以支持众所周知的企业集成模式。
它支持在基于 Spring 的应用程序内进行轻量级消息传递,并支持通过声明式适配器与外部系统集成。
它还提供了一个高级 DSL,用于将各种操作(端点)组合到一个逻辑集成流中。
通过这种 DSL 配置的 lambda 样式, Spring 集成已经有了很好的接口采用率。
代理接口也可以是 or ,根据 Spring Cloud Function 环境,可以将其注册到函数目录中。
请参阅 Spring Integration ReferenceManual 中有关其对函数的支持的更多信息。java.util.function
@MessagingGateway
Function
Consumer
另一方面,从 version 开始,Spring Cloud Function 引入了一个模块,该模块提供了更深入、更特定于云和基于自动配置的 API,用于从 Spring 集成 DSL 的角度进行交互。
该 是自动配置的,并使用 和 表示目标实例的特定于函数的 DSL 的入口点。
除了标准工厂(为方便起见)之外,还公开了一个工厂来查找提供的 .
然后,这会导致 .
这是 and 公开和运算符的实现,分别用于查找或从 。
有关更多信息,请参阅他们的 Javadocs。4.0.3
spring-cloud-function-integration
FunctionCatalog
FunctionFlowBuilder
FunctionCatalog
IntegrationFlow
IntegrationFlow.from()
FunctionFlowBuilder
fromSupplier(String supplierDefinition)
Supplier
FunctionCatalog
FunctionFlowBuilder
FunctionFlowDefinition
FunctionFlowDefinition
IntegrationFlowExtension
apply(String functionDefinition)
accept(String consumerDefinition)
Function
Consumer
FunctionCatalog
以下示例演示了 in 操作以及 API 其余部分的强大功能:FunctionFlowBuilder
IntegrationFlow
@Configuration
public class IntegrationConfiguration {
@Bean
Supplier<byte[]> simpleByteArraySupplier() {
return "simple test data"::getBytes;
}
@Bean
Function<String, String> upperCaseFunction() {
return String::toUpperCase;
}
@Bean
BlockingQueue<String> results() {
return new LinkedBlockingQueue<>();
}
@Bean
Consumer<String> simpleStringConsumer(BlockingQueue<String> results) {
return results::add;
}
@Bean
QueueChannel wireTapChannel() {
return new QueueChannel();
}
@Bean
IntegrationFlow someFunctionFlow(FunctionFlowBuilder functionFlowBuilder) {
return functionFlowBuilder
.fromSupplier("simpleByteArraySupplier")
.wireTap("wireTapChannel")
.apply("upperCaseFunction")
.log(LoggingHandler.Level.WARN)
.accept("simpleStringConsumer");
}
}
由于该功能不仅限于简单的函数名称,因此还可以在 mentioned 和 运算符中使用函数组合功能:FunctionCatalog.lookup()
apply()
accept()
@Bean
IntegrationFlow functionCompositionFlow(FunctionFlowBuilder functionFlowBuilder) {
return functionFlowBuilder
.from("functionCompositionInput")
.accept("upperCaseFunction|simpleStringConsumer");
}
当我们在 Spring Cloud 应用程序中添加预定义函数的自动配置依赖项时,此 API 变得更加相关。
例如,除了应用程序映像之外,Stream Applications 项目还为各种集成用例(例如 、 等)提供了具有函数的工件。debezium-supplier
elasticsearch-consumer
aggregator-function
以下配置分别基于 , 和 :http-supplier
spel-function
file-consumer
@Bean
IntegrationFlow someFunctionFlow(FunctionFlowBuilder functionFlowBuilder) {
return functionFlowBuilder
.fromSupplier("httpSupplier", e -> e.poller(Pollers.trigger(new OnlyOnceTrigger())))
.<Flux<?>>handle((fluxPayload, headers) -> fluxPayload, e -> e.async(true))
.channel(c -> c.flux())
.apply("spelFunction")
.<String, String>transform(String::toUpperCase)
.accept("fileConsumer");
}
否则,我们需要的只是将它们的配置添加到 (如有必要):application.properties
http.path-pattern=/testPath
spel.function.expression=new String(payload)
file.consumer.name=test-data.txt