此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 spring-cloud-function 4.1.4! |
编程模型
函数目录和灵活的函数签名
Spring Cloud Function 的主要功能之一是适应和支持用户定义的函数的一系列类型签名,
同时提供一致的执行模型。
这就是为什么所有用户定义的函数都被 转换为规范表示形式的原因。FunctionCatalog
虽然用户通常根本不需要关心 ,但了解什么是
类型的函数。FunctionCatalog
了解 Spring Cloud Function 为反应式 API 提供一流的支持也很重要
由 Project Reactor 提供,允许将响应式基元(例如 和 )用作用户定义函数中的类型,从而在为
您的函数实现。
反应式编程模型还支持对原本难以或不可能实现的功能提供支持
使用命令式编程风格。有关更多信息,请阅读 函数 Arity 部分。Mono
Flux
Java 8 函数支持
Spring Cloud Function 包含并构建在 Java 定义的 3 个核心功能接口之上 从 Java 8 开始可供我们使用。
-
供应商<O>
-
函数<I, O>
-
消费者<I>
为了避免经常提到 ,我们将在本手册的其余部分适当地将它们称为 Functional bean。Supplier
Function
Consumer
简而言之,Application Context 中任何属于 Functional bean 的 bean 都将延迟注册到 。
这意味着它可以受益于本参考手册中描述的所有其他功能。FunctionCatalog
在最简单的应用程序中,您需要做的就是声明 type ,或者在您的应用程序配置中声明。
然后,您可以根据其名称访问和查找特定函数。@Bean
Supplier
Function
Consumer
FunctionCatalog
例如:
@Bean
public Function<String, String> uppercase() {
return value -> value.toUpperCase();
}
. . .
FunctionCatalog catalog = applicationContext.getBean(FunctionCatalog.class);
Function uppercase = catalog.lookup(“uppercase”);
重要的是要了解,鉴于它是一个 bean,您当然可以直接从 中获取它,但您得到的只是您声明的 bean,而 SCF 没有提供任何额外的功能。当您通过 查找函数时,您将收到的实例将包装(插桩)本手册中描述的附加功能(即类型转换、组合等)。此外,重要的是要了解典型用户不会直接使用 Spring Cloud Function。相反,典型用户实现 Java 的想法是在不同的执行上下文中使用它,而无需额外的工作。例如,相同的 java 函数可以通过提供的 Spring Cloud 函数表示为 REST 端点或流式消息处理程序或 AWS Lambda 等
适配器以及使用 Spring Cloud Function 作为核心编程模型的其他框架(例如 Spring Cloud Stream)。
因此,总而言之, Spring Cloud Function 使用可在各种执行上下文中使用的附加功能来检测 java 函数。uppercase
ApplicationContext
FunctionCatalog
Function/Supplier/Consumer
功能定义
虽然前面的示例向您展示了如何以编程方式在 FunctionCatalog 中查找函数,但在 Spring Cloud Function 被另一个框架(例如 Spring Cloud Stream)用作编程模型的典型集成情况下,您可以通过属性声明要使用的函数。知道在 中发现函数时了解一些默认行为很重要。例如,如果你只有一个 Functional bean 中,则通常不需要该属性,因为 中的单个函数可以通过空名称或任何名称来查找。例如,假设这是目录中的唯一函数,则可以将其查找为 , , 。
也就是说,对于您使用框架(例如 Spring Cloud Stream)的情况,最佳实践是最佳实践,建议始终使用 property。spring.cloud.function.definition
FunctionCatalog
ApplicationContext
spring.cloud.function.definition
FunctionCatalog
uppercase
catalog.lookup(null)
catalog.lookup(“”)
catalog.lookup(“foo”)
spring.cloud.function.definition
spring.cloud.function.definition
例如
spring.cloud.function.definition=uppercase
筛选不合格的函数
典型的 Application Context 可能包括作为有效 java 函数的 bean,但不是要注册的候选 bean。
这样的 bean 可以是来自其他项目的自动配置,也可以是符合成为 Java 函数条件的任何其他 bean。
该框架提供了已知 bean 的默认过滤,这些 bean 不应该成为向函数 catalog 注册的候选 bean。
您还可以通过使用 property 提供逗号分隔的 bean 定义名称列表来向此列表添加其他 bean。FunctionCatalog
spring.cloud.function.ineligible-definitions
例如
spring.cloud.function.ineligible-definitions=foo,bar
供应商
供应商可以是反应性的,也可以是命令性的。从调用的角度来看,这应该没有区别
向此类供应商的实施者披露。但是,在框架内使用
(例如,Spring Cloud Stream)、供应商,尤其是反应式的、
通常用于表示流的源,因此它们被调用一次以获取流(例如,Flux)
使用者可以订阅的。换句话说,这些供应商相当于无限流。
但是,相同的反应式 suppliers 也可以表示有限流(例如,轮询的 JDBC 数据上的结果集)。
在这些情况下,这种反应式供应商必须连接到底层框架的某种轮询机制。Supplier<Flux<T>>
Supplier<T>
为了帮助实现这一点,Spring Cloud Function 提供了一个标记 Comments,以表明此类供应商生成了一个
finite 流,可能需要再次轮询。也就是说,了解 Spring Cloud Function 本身是很重要的
不为此批注提供任何行为。org.springframework.cloud.function.context.PollableBean
此外,annotation 公开了一个 splittable 属性,以发出生成的流的信号
需要拆分(参见 Splitter EIPPollableBean
)
示例如下:
@PollableBean(splittable = true)
public Supplier<Flux<String>> someSupplier() {
return () -> {
String v1 = String.valueOf(System.nanoTime());
String v2 = String.valueOf(System.nanoTime());
String v3 = String.valueOf(System.nanoTime());
return Flux.just(v1, v2, v3);
};
}
功能
Function 也可以以命令式或响应式方式编写,但与 Supplier 和 Consumer 不同的是,有 实现者无需特别考虑,只需了解在框架中使用时 比如 Spring Cloud Stream 等,响应式函数是 仅调用一次以将引用传递给流(即 Flux 或 Mono),并且每个事件调用一次 imperative。
public Function<String, String> uppercase() {
. . . .
}
双函数
如果您需要通过有效负载接收一些额外的数据(元数据),您始终可以将函数 signature 接收一个 Message,其中包含包含此类附加信息的 Headers 映射。
public Function<Message<String>, String> uppercase() {
. . . .
}
为了让你的函数签名更轻、更 POJO,还有另一种方法。您可以使用 .BiFunction
public BiFunction<String, Map, String> uppercase() {
. . . .
}
鉴于 a 仅包含两个属性(payload 和 headers)并且需要两个输入参数,框架将自动识别此范式,并从中提取 payload,将其作为第一个参数传递,将 headers 映射作为第二个参数传递。
在这种情况下,你的函数也没有与 Spring 的消息传递 API 耦合。
请记住,BiFunction 需要严格的签名,其中第二个参数必须是 Map。
相同的规则也适用于 .Message
BiFunction
Message
BiConsumer
功能组成
函数组合是一项功能,允许将多个函数组合成一个。 核心支持基于自 Java 8 以来提供的 Function.andThen(..) 支持中提供的函数组合功能。但是,最重要的是,我们提供了一些附加功能。
声明式函数组合
此功能允许您使用 (管道) 或 (逗号) 分隔符以声明性方式提供组合指令
在提供财产时。|
,
spring.cloud.function.definition
例如
--spring.cloud.function.definition=uppercase|reverse
在这里,我们有效地提供了一个函数的定义,它本身是
函数和函数 .事实上,这就是属性名称是 definition 而不是 name 的原因之一。
因为函数的定义可以是几个命名函数的组合。
如前所述,您可以使用 pipe 代替 (例如 )。uppercase
reverse
,
…definition=uppercase,reverse
函数路由和筛选
从 2.2 版本开始,Spring Cloud Function 提供了路由功能,允许 you 调用单个函数,该函数充当您希望调用的实际函数的路由器。 此功能在某些维护配置的 FAAS 环境中非常有用 for several 函数可能很麻烦,或者无法公开多个函数。
在 FunctionCatalog 中以名称 .为了简单
和 consistency 也可以参考 constant。RoutingFunction
functionRouter
RoutingFunction.FUNCTION_NAME
此函数具有以下签名:
public class RoutingFunction implements Function<Object, Object> {
. . .
}
路由指令可以通过多种方式进行通信。我们支持通过 Message headers、System 属性以及可插拔策略。那么让我们看看一些细节
MessageRoutingCallback 回调
这是一种帮助确定 route-to 函数定义名称的策略。MessageRoutingCallback
public interface MessageRoutingCallback {
default String routingResult(Message<?> message) {
return (String) message.getHeaders().get(FunctionProperties.FUNCTION_DEFINITION);
}
}
您需要做的就是实现并将其注册为 bean 以供 .
例如:RoutingFunction
@Bean
public MessageRoutingCallback customRouter() {
return new MessageRoutingCallback() {
@Override
public String routingResult(Message<?> message) {
return (String) message.getHeaders().get(FunctionProperties.FUNCTION_DEFINITION);
}
};
}
在前面的示例中,你可以看到一个非常简单的实现,它从传入 Message 的 Message 头中确定函数定义,并返回表示要调用的函数定义的实例。MessageRoutingCallback
FunctionProperties.FUNCTION_DEFINITION
String
消息报头
如果 input 参数是 type ,则可以通过设置 或 Message headers 之一来传达路由指令。
正如该属性的名称所暗示的那样,依赖于 Spring 表达式语言 (SpEL)。
对于更多静态情况,您可以使用 header,它允许您提供
单个函数的名称(例如 )或组合指令的名称(例如 )。
对于更多动态情况,您可以使用 header 并提供应解析的 SpEL 表达式
转换为函数的定义(如上所述)。Message<?>
spring.cloud.function.definition
spring.cloud.function.routing-expression
spring.cloud.function.routing-expression
spring.cloud.function.definition
…definition=foo
…definition=foo|bar|baz
spring.cloud.function.routing-expression
SPEL 评估上下文的根对象是
actual input 参数,因此在 You can 构造具有 Access
同时设置为 and(例如,)。Message<?> payload headers spring.cloud.function.routing-expression=headers.function_name |
SPEL 允许用户提供要执行的 Java 代码的字符串表示。鉴于可以通过 Message headers 提供,这意味着设置此类表达式的能力可能会暴露给最终用户(即使用 Web 模块时的 HTTP Headers),这可能会导致一些问题(例如,恶意代码)。为了管理这一点,所有通过 Message 标头的表达式将仅根据其功能有限且设计为仅评估上下文对象(在我们的例子中为 Message)。另一方面,通过 property 或 system variable 设置的所有表达式都根据 进行计算,这允许 Java 语言具有完全的灵活性。
虽然通过系统/应用程序属性或环境变量设置表达式通常被认为是安全的,因为它在正常情况下不会向最终用户公开,但在某些情况下,可见性以及更新系统、应用程序和环境变量的能力确实通过某些 Spring 项目或第三方提供的 Spring Boot Actuator 端点或最终用户的自定义实现向最终用户公开。必须使用行业标准的 Web 安全实践来保护此类终端节点。
Spring Cloud Function 不会公开任何此类端点。spring.cloud.function.routing-expression SimpleEvaluationContext StandardEvaluationContext |
在特定的执行环境/模型中,适配器负责转换和通信和/或通过 Message header。
例如,当使用 spring-cloud-function-web 时,您可以作为 HTTP
header 的 Header,框架会将其以及其他 HTTP 头作为 Message 头传播。spring.cloud.function.definition
spring.cloud.function.routing-expression
spring.cloud.function.definition
应用程序属性
路由指令也可以通过应用程序属性或作为应用程序属性进行通信。在
上一节也适用于此处。唯一的区别是你以
应用程序属性(例如 )。spring.cloud.function.definition
spring.cloud.function.routing-expression
--spring.cloud.function.definition=foo
重要的是要了解提供 或 as Message 标头仅适用于命令式函数(例如,)。
也就是说,我们只能使用命令式函数路由每条消息。使用响应式函数,我们不能路由每条消息。因此,您只能将路由指令作为 Application Properties 提供。
这一切都与工作单元有关。在命令式功能中,工作单元是 Message,因此我们可以基于这样的工作单元进行路由。
使用 reactive function,work unit-of-work 是整个流,因此我们将只对 application 提供的指令进行操作
属性并路由整个流。spring.cloud.function.definition spring.cloud.function.routing-expression Function<Foo, Bar> |
路由指令的优先级顺序
鉴于我们有多种提供路由指令的机制,因此了解 在同时使用多个机制的情况下解决冲突,因此顺序如下:
-
MessageRoutingCallback
(如果 function 是 imperative 的,则无论是否定义了其他任何内容,都将接管) -
消息标头 (如果 function 是命令式且未提供)
MessageRoutingCallback
-
应用程序属性(任何功能)
无法路由的消息
如果 route-to 函数在 catalog 中不可用,您将收到一个异常,指出该
在某些情况下,这种行为是不可取的,你可能希望有一些 “catch-all” 类型的函数来处理这样的消息。
为了实现这一目标,框架提供了策略。您需要做的就是将其注册为 bean。
它的默认实现将仅记录消息不可路由的事实,但将允许消息流在没有异常的情况下继续进行,从而有效地删除不可路由的消息。
如果你想要更复杂的东西,你需要做的就是提供你自己的这个策略的实现,并将其注册为 bean。org.springframework.cloud.function.context.DefaultMessageRoutingHandler
@Bean
public DefaultMessageRoutingHandler defaultRoutingHandler() {
return new DefaultMessageRoutingHandler() {
@Override
public void accept(Message<?> message) {
// do something really cool
}
};
}
函数过滤
过滤是一种只有两条路径的路由类型 - “go” 或 “discard”。就功能而言,它的意思是 你只想在某个条件返回 'true' 时调用某个函数,否则你想要丢弃输入。 但是,当涉及到丢弃 input 时,对于它在应用程序上下文中的含义,有许多解释。 例如,您可能希望记录它,或者您可能希望维护丢弃消息的计数器。您可能也不想什么都不做。 由于这些路径不同,我们不提供如何处理丢弃消息的通用配置选项。 相反,我们只建议定义一个简单的 Consumer,它表示 'discard' 路径:
@Bean
public Consumer<?> devNull() {
// log, count or whatever
}
现在,您可以让实际上只有两条路径的路由表达式有效地成为过滤器。例如:
--spring.cloud.function.routing-expression=headers.contentType.toString().equals('text/plain') ? 'echo' : 'devNull'
每条不符合 'echo' 函数条件的消息都会进入 'devNull',在那里你什么都不能做。
签名还将确保不会尝试类型转换,从而几乎不会产生执行开销。Consumer<?>
在处理响应式 Input(例如 Publisher)时,路由指令只能通过 Function 属性提供。这是 由于响应式函数的性质,这些函数只调用一次来传递 Publisher 和其余的 由 Reactor 处理,因此我们无法访问和/或依赖通过单个 值(例如 Message )。 |
多个路由器
默认情况下,框架将始终配置一个路由函数,如前面部分所述。但是,有时您可能需要多个路由功能。
在这种情况下,除了现有的 bean 实例之外,您还可以创建自己的 bean 实例,只要您为其指定的名称不是 。RoutingFunction
functionRouter
您可以在映射中将 or 作为键/值对传递给 RoutinFunction。spring.cloud.function.routing-expression
spring.cloud.function.definition
这是一个简单的例子
@Configuration protected static class MultipleRouterConfiguration { @Bean RoutingFunction mySpecialRouter(FunctionCatalog functionCatalog, BeanFactory beanFactory, @Nullable MessageRoutingCallback routingCallback) { Map<String, String> propertiesMap = new HashMap<>(); propertiesMap.put(FunctionProperties.PREFIX + ".routing-expression", "'reverse'"); return new RoutingFunction(functionCatalog, propertiesMap, new BeanFactoryResolver(beanFactory), routingCallback); } @Bean public Function<String, String> reverse() { return v -> new StringBuilder(v).reverse().toString(); } @Bean public Function<String, String> uppercase() { return String::toUpperCase; } }
以及演示其工作原理的测试
`
@Test public void testMultipleRouters() { System.setProperty(FunctionProperties.PREFIX + ".routing-expression", "'uppercase'"); FunctionCatalog functionCatalog = this.configureCatalog(MultipleRouterConfiguration.class); Function function = functionCatalog.lookup(RoutingFunction.FUNCTION_NAME); assertThat(function).isNotNull(); Message<String> message = MessageBuilder.withPayload("hello").build(); assertThat(function.apply(message)).isEqualTo("HELLO"); function = functionCatalog.lookup("mySpecialRouter"); assertThat(function).isNotNull(); message = MessageBuilder.withPayload("hello").build(); assertThat(function.apply(message)).isEqualTo("olleh"); }
[[输入/输出丰富]] == 输入/输出丰富
很多时候,你需要修改或优化传入或传出的 Message,并保持你的代码没有非功能性问题。您不想在业务逻辑中执行此操作。
您始终可以通过 Function Composition 来完成它。这种方法有几个好处:
-
它允许您将这个非功能性关注点隔离到一个单独的函数中,您可以将该函数与业务函数作为函数定义进行组合。
-
它为你提供了完全的自由(和危险),让你知道在传入消息到达实际业务功能之前你可以修改什么。
@Bean
public Function<Message<?>, Message<?>> enrich() {
return message -> MessageBuilder.fromMessage(message).setHeader("foo", "bar").build();
}
@Bean
public Function<Message<?>, Message<?>> myBusinessFunction() {
// do whatever
}
然后通过提供以下函数定义来编写您的函数。enrich|myBusinessFunction
虽然所描述的方法是最灵活的,但它也是最复杂的,因为它需要您编写一些代码,使其成为 bean 或 手动将其注册为函数,然后才能将其与业务函数组合,如前面的示例所示。
但是,如果您尝试进行的修改 (扩充) 像前面的示例中一样微不足道,该怎么办?有没有一个更简单、更动态和可配置的 机制来实现相同的目标?
从版本 3.1.3 开始,该框架允许您提供 SPEL 表达式来丰富进入函数的 Importing 和 以及从中输出。让我们以其中一个测试为例。
@Test
public void testMixedInputOutputHeaderMapping() throws Exception {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
SampleFunctionConfiguration.class).web(WebApplicationType.NONE).run(
"--logging.level.org.springframework.cloud.function=DEBUG",
"--spring.main.lazy-initialization=true",
"--spring.cloud.function.configuration.split.output-header-mapping-expression.keyOut1='hello1'",
"--spring.cloud.function.configuration.split.output-header-mapping-expression.keyOut2=headers.contentType",
"--spring.cloud.function.configuration.split.input-header-mapping-expression.key1=headers.path.split('/')[0]",
"--spring.cloud.function.configuration.split.input-header-mapping-expression.key2=headers.path.split('/')[1]",
"--spring.cloud.function.configuration.split.input-header-mapping-expression.key3=headers.path")) {
FunctionCatalog functionCatalog = context.getBean(FunctionCatalog.class);
FunctionInvocationWrapper function = functionCatalog.lookup("split");
Message<byte[]> result = (Message<byte[]>) function.apply(MessageBuilder.withPayload("helo")
.setHeader(MessageHeaders.CONTENT_TYPE, "application/json")
.setHeader("path", "foo/bar/baz").build());
assertThat(result.getHeaders().containsKey("keyOut1")).isTrue();
assertThat(result.getHeaders().get("keyOut1")).isEqualTo("hello1");
assertThat(result.getHeaders().containsKey("keyOut2")).isTrue();
assertThat(result.getHeaders().get("keyOut2")).isEqualTo("application/json");
}
}
在这里,您会看到一个名为 properties 的属性,前面是函数的名称(即 ),后跟要设置的消息头键的名称和值 SPEL 表达式。第一个表达式(对于 'keyOut1')是用单引号括起来的文字 SpEL 表达式,有效地将 'keyOut1' 设置为 value 。设置为现有 'contentType' 标头的值。input-header-mapping-expression
output-header-mapping-expression
split
hello1
keyOut2
你还可以在 input Headers 映射中观察到一些有趣的功能,我们实际上拆分了现有 Headers 'path' 的值,将 key1 和 key2 的单个值设置为基于索引的拆分元素的值。
如果由于某种原因提供的表达式计算失败,则函数的执行将继续进行,就像什么都没发生一样。 但是,您会在日志中看到 WARN 消息,通知您 |
o.s.c.f.context.catalog.InputEnricher : Failed while evaluating expression "hello1" on incoming message. . .
如果您正在处理具有多个输入的函数(下一节),则可以在input-header-mapping-expression
--spring.cloud.function.configuration.echo.input-header-mapping-expression[0].key1=‘hello1'
--spring.cloud.function.configuration.echo.input-header-mapping-expression[1].key2='hello2'
函数 Arity
有时需要对数据流进行分类和组织。例如 考虑一个经典的大数据用例,即处理无组织的数据,比如说, 'orders' 和 'invoices',并且您希望每个都进入单独的数据存储。 这就是函数 arity(具有多个输入和输出的函数)支持的地方 来玩。
让我们看一个这样函数的示例(完整的实现细节在这里提供),
@Bean
public Function<Flux<Integer>, Tuple2<Flux<String>, Flux<String>>> organise() {
return flux -> ...;
}
鉴于 Project Reactor 是 SCF 的核心依赖项,我们正在使用它的 Tuple 库。 元组通过向我们传达基数和类型信息,为我们提供了独特的优势。 在 SCSt 的背景下,两者都非常重要。Cardinality 让我们知道 需要创建多少个输入和输出绑定,并将其绑定到对应的 函数的输入和输出。了解类型信息可确保正确的类型 转换。
此外,这是绑定命名约定的 'index' 部分
names 开始发挥作用,因为在这个函数中,两个输出绑定
名称是 和 。organise-out-0
organise-out-1
重要提示:目前,函数 arity 仅支持响应式函数
() 以复杂事件处理为中心
其中,对事件汇合的评估和计算通常需要查看
事件流,而不是单个事件。Function<TupleN<Flux<?>…>, TupleN<Flux<?>…>> |
Input Header 传播
在典型场景中,input Message 头不会传播到输出,这是理所当然的,因为函数的输出可能是其他事物的 input,需要它自己的 Message 头集。 但是,有时可能需要这种传播,因此 Spring Cloud Function 提供了几种机制来实现这一点。
首先,您始终可以手动复制 Headers。例如,如果你有一个 Function 的签名是 take 和 return(即 ),你可以简单地有选择地自己复制 headers。请记住,如果你的函数返回 Message,框架不会对它执行任何操作,只会正确转换其有效负载。
但是,这种方法可能有点乏味,尤其是在您只想复制所有标头的情况下。
为了帮助处理此类情况,我们提供了一个简单的属性,该属性允许您在希望传播 input 标头的函数上设置布尔标志。
属性为 。Message
Message
Function<Message, Message>
copy-input-headers
例如,假设您有以下配置:
@EnableAutoConfiguration
@Configuration
protected static class InputHeaderPropagationConfiguration {
@Bean
public Function<String, String> uppercase() {
return x -> x.toUpperCase();
}
}
如你所知,你仍然可以通过向它发送 Message 来调用这个函数(框架将负责类型转换和有效负载提取)
通过简单地设置为 ,以下断言也将为 truespring.cloud.function.configuration.uppercase.copy-input-headers
true
Function<Message<String>, Message<byte[]>> uppercase = catalog.lookup("uppercase", "application/json"); Message<byte[]> result = uppercase.apply(MessageBuilder.withPayload("bob").setHeader("foo", "bar").build()); assertThat(result.getHeaders()).containsKey("foo");
类型转换(Content-Type 协商)
Content-Type 协商是 Spring Cloud Function 的核心功能之一,因为它不仅允许将传入数据转换为声明的类型 通过函数签名,但在函数组合期间执行相同的转换,使原本不可组合(按类型)的函数可组合。
为了更好地理解内容类型协商背后的机制和必要性,我们看了一个非常简单的用例,如下所示: 以以下函数为例:
@Bean
public Function<Person, String> personFunction {..}
前面示例中显示的函数需要一个对象作为参数,并生成一个 String 类型作为输出。如果此类函数是
调用,则 all 工作正常。但通常函数扮演着传入数据的处理程序的角色,这些数据通常来自
以 Raw 格式(如 等)进行。为了使框架成功地将传入数据作为参数传递给
这个函数,它必须以某种方式将传入的数据转换为类型。Person
Person
byte[]
JSON String
Person
Spring Cloud Function 依赖于 Spring 的两种原生机制来实现这一点。
-
MessageConverter - 将传入的 Message 数据转换为函数声明的类型。
-
ConversionService - 将传入的非 Message 数据转换为函数声明的类型。
这意味着根据原始数据类型(Message 或非 Message),Spring Cloud Function 将应用一种或另一种机制。
在大多数情况下,在处理作为其他请求(例如,HTTP、消息等)的一部分调用的函数时,框架依赖于
因为此类请求已转换为 Spring 。换句话说,框架会查找并应用适当的 .
为此,框架需要用户提供一些说明。这些指令之一已经由函数的签名提供
本身(Person 类型)。因此,从理论上讲,这应该(而且在某些情况下是)足够了。但是,对于大多数使用案例,为了
选择适当的 ,框架需要一条附加信息。缺失的部分是标题。MessageConverters
Message
MessageConverter
MessageConverter
contentType
这样的 Headers 通常作为 Message 的一部分,由首先创建此类 Message 的相应适配器注入。
例如,HTTP POST 请求会将其内容类型的 HTTP 标头复制到 Message 的标头。contentType
对于不存在此类标头的情况,框架依赖于默认内容类型,如 .application/json
内容类型与参数类型
如前所述,要使框架选择适当的 ,它需要参数类型和内容类型信息(可选)。
选择适当 Parser 的逻辑驻留在参数 resolvers 中,该 resolvers 在调用用户定义的
function(即框架知道实际参数类型时)。
如果参数类型与当前有效负载的类型不匹配,则框架会将
预配置以查看它们中的任何一个是否可以转换有效负载。MessageConverter
MessageConverter
MessageConverters
和 argument type 的组合是框架通过定位来确定 message 是否可以转换为目标类型的机制
适当的 .
如果未找到合适的,则会引发异常,您可以通过添加自定义来处理该异常(请参阅 )。contentType
MessageConverter
MessageConverter
MessageConverter
User-defined Message Converters
不要指望仅根据 .
请记住,这是对 target type 的补充。
这是一个暗示,可能会也可能不会考虑。Message contentType contentType MessageConverter |
消息转换器
MessageConverters
定义两个方法:
Object fromMessage(Message<?> message, Class<?> targetClass);
Message<?> toMessage(Object payload, @Nullable MessageHeaders headers);
了解这些方法的 Contract 及其用法非常重要,特别是在 Spring Cloud Stream 的上下文中。
该方法将 incoming 转换为 argument 类型。
的有效负载可以是任何类型,并且是
直到实际实现 以支持多种类型。fromMessage
Message
Message
MessageConverter
提供的 MessageConverters
如前所述,该框架已经提供了一个堆栈来处理大多数常见的用例。
以下列表按优先顺序描述了提供的 (使用第一个有效的):MessageConverters
MessageConverters
MessageConverter
-
JsonMessageConverter
:支持在使用 Jackson (DEFAULT) 或 Gson 库的情况下转换往返 POJO 的有效负载。此消息转换器还知道参数(例如,application/json;type=foo.bar.Person 的 Person)。这在开发函数时类型可能未知的情况下非常有用,因此函数签名可能类似于 or 或 。换句话说,对于类型转换,我们通常从函数签名中派生类型。具有 mime-type 参数允许您以更动态的方式传达类型。Message
contentType
application/json
type
Function<?, ?>
Function
Function<Object, Object>
-
ByteArrayMessageConverter
:支持将 from 的有效负载转换为 if 为 的情况。它本质上是一种传递,主要是为了向后兼容。Message
byte[]
byte[]
contentType
application/octet-stream
-
StringMessageConverter
:支持将任何类型的 when is 转换为 .String
contentType
text/plain
当找不到合适的转换器时,框架会引发异常。发生这种情况时,您应该检查您的代码和配置,并确保这样做了
不要错过任何内容(即,确保您使用 Binding 或 Header) 提供了 A。
但是,最有可能的是,您发现了一些不常见的情况(例如可能是自定义),并且当前提供的堆栈不知道如何转换。如果是这种情况,您可以添加 custom .请参见用户定义的消息转换器。contentType
contentType
MessageConverters
MessageConverter
用户定义的消息转换器
Spring Cloud Function 公开了一种机制来定义和注册 additional 。
要使用它,请实施 ,将其配置为 .
然后将其附加到现有的 'MessageConverter' 堆栈中。MessageConverters
org.springframework.messaging.converter.MessageConverter
@Bean
重要的是要了解 custom implementations 被添加到现有堆栈的头部。
因此,自定义 implementations 优先于现有 implementations,这使您可以覆盖和添加现有 converters。MessageConverter MessageConverter |
以下示例显示如何创建消息转换器 bean 以支持名为:application/bar
@SpringBootApplication
public static class SinkApplication {
...
@Bean
public MessageConverter customMessageConverter() {
return new MyCustomMessageConverter();
}
}
public class MyCustomMessageConverter extends AbstractMessageConverter {
public MyCustomMessageConverter() {
super(new MimeType("application", "bar"));
}
@Override
protected boolean supports(Class<?> clazz) {
return (Bar.class.equals(clazz));
}
@Override
protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
Object payload = message.getPayload();
return (payload instanceof Bar ? payload : new Bar((byte[]) payload));
}
}
有关 JSON 选项的说明
在 Spring Cloud Function 中,我们支持 Jackson 和 Gson 机制来处理 JSON。
为了您的利益,已经抽象了它,在它下,它自己知道两种机制,并将使用选定的一种
由您或遵循默认规则。
默认规则如下:org.springframework.cloud.function.json.JsonMapper
-
无论 Classpath 上的哪个库都是要使用的机制。因此,如果你必须进入 classpath,将使用 Jackson,如果你有,那么将使用 Gson。
com.fasterxml.jackson.*
com.google.code.gson
-
如果两者都有,则 Gson 将是默认值,或者您可以使用以下两个值之一设置 property:或 。
spring.cloud.function.preferred-json-mapper
gson
jackson
也就是说,类型转换通常对开发人员是透明的,但是考虑到它也注册为 bean
如果需要,您可以轻松地将其注入到您的代码中。org.springframework.cloud.function.json.JsonMapper
Kotlin Lambda 支持
我们还提供对 Kotlin lambda 的支持(自 v2.0 起)。 请考虑以下事项:
@Bean
open fun kotlinSupplier(): () -> String {
return { "Hello from Kotlin" }
}
@Bean
open fun kotlinFunction(): (String) -> String {
return { it.toUpperCase() }
}
@Bean
open fun kotlinConsumer(): (String) -> Unit {
return { println(it) }
}
以上表示配置为 Spring bean 的 Kotlin lambda。每个签名都映射到 Java 等效的 、 和 ,因此框架支持/识别签名。
虽然 Kotlin 到 Java 映射的机制不在本文档的讨论范围之内,但请务必了解
此处也适用“Java 8 函数支持”部分中概述的签名转换规则。Supplier
Function
Consumer
要启用 Kotlin 支持,您只需在类路径上添加 Kotlin SDK 库,这将触发适当的 autoconfiguration 和支持类。
功能组件扫描
Spring Cloud Function 将扫描 的实现,并在名为 (如果存在) 的包中查找。使用这个
功能,您可以编写不依赖于 Spring 的函数 - 甚至不需要 Annotation。如果您想使用不同的
package 中,您可以设置 .您还可以使用 完全关闭扫描。Function
Consumer
Supplier
functions
@Component
spring.cloud.function.scan.packages
spring.cloud.function.scan.enabled=false
数据掩码
典型的应用程序带有多个级别的日志记录。某些云/无服务器平台可能会在记录的数据包中包含敏感数据,以供所有人查看。
虽然检查正在记录的数据是单个开发人员的责任,但日志记录来自框架本身,因此从版本 4.1 开始,我们引入了最初帮助屏蔽 AWS Lambda 负载中的敏感数据。但是,它是通用的,可用于任何模块。目前,它仅适用于结构化数据,例如 JSON。您只需指定要屏蔽的键,其余的交给它。
应在 file 中指定 keys 。文件格式非常简单,您可以用逗号或换行符或两者分隔多个键。JsonMasker
JsonMasker
META-INF/mask.keys
以下是此类文件内容的示例:
eventSourceARN asdf1, SS
在这里,您会看到定义了三个键 一旦存在这样的文件,JsonMasker 将使用它来屏蔽指定键的值。
下面是显示用法的示例代码
private final static JsonMasker masker = JsonMasker.INSTANCE(); . . . logger.info("Received: " + masker.mask(new String(payload, StandardCharsets.UTF_8)));