此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 spring-cloud-function 4.1.4spring-doc.cn

编程模型

函数目录和灵活的函数签名

Spring Cloud Function 的主要功能之一是适应和支持用户定义的函数的一系列类型签名, 同时提供一致的执行模型。 这就是为什么所有用户定义的函数都被 转换为规范表示形式的原因。FunctionCatalogspring-doc.cn

虽然用户通常根本不需要关心 ,但了解什么是 类型的函数。FunctionCatalogspring-doc.cn

了解 Spring Cloud Function 为反应式 API 提供一流的支持也很重要 由 Project Reactor 提供,允许将响应式基元(例如 和 )用作用户定义函数中的类型,从而在为 您的函数实现。 反应式编程模型还支持对原本难以或不可能实现的功能提供支持 使用命令式编程风格。有关更多信息,请阅读 函数 Arity 部分。MonoFluxspring-doc.cn

Java 8 函数支持

Spring Cloud Function 包含并构建在 Java 定义的 3 个核心功能接口之上 从 Java 8 开始可供我们使用。spring-doc.cn

为了避免经常提到 ,我们将在本手册的其余部分适当地将它们称为 Functional bean。SupplierFunctionConsumerspring-doc.cn

简而言之,Application Context 中任何属于 Functional bean 的 bean 都将延迟注册到 。 这意味着它可以受益于本参考手册中描述的所有其他功能。FunctionCatalogspring-doc.cn

在最简单的应用程序中,您需要做的就是声明 type ,或者在您的应用程序配置中声明。 然后,您可以根据其名称访问和查找特定函数。@BeanSupplierFunctionConsumerFunctionCatalogspring-doc.cn

例如:spring-doc.cn

@Bean
public Function<String, String> uppercase() {
    return value -> value.toUpperCase();
}

. . .

FunctionCatalog catalog = applicationContext.getBean(FunctionCatalog.class);
Function uppercase =  catalog.lookup(“uppercase”);

重要的是要了解,鉴于它是一个 bean,您当然可以直接从 中获取它,但您得到的只是您声明的 bean,而 SCF 没有提供任何额外的功能。当您通过 查找函数时,您将收到的实例将包装(插桩)本手册中描述的附加功能(即类型转换、组合等)。此外,重要的是要了解典型用户不会直接使用 Spring Cloud Function。相反,典型用户实现 Java 的想法是在不同的执行上下文中使用它,而无需额外的工作。例如,相同的 java 函数可以通过提供的 Spring Cloud 函数表示为 REST 端点流式消息处理程序AWS Lambda 等 适配器以及使用 Spring Cloud Function 作为核心编程模型的其他框架(例如 Spring Cloud Stream)。 因此,总而言之, Spring Cloud Function 使用可在各种执行上下文中使用的附加功能来检测 java 函数。uppercaseApplicationContextFunctionCatalogFunction/Supplier/Consumerspring-doc.cn

功能定义

虽然前面的示例向您展示了如何以编程方式在 FunctionCatalog 中查找函数,但在 Spring Cloud Function 被另一个框架(例如 Spring Cloud Stream)用作编程模型的典型集成情况下,您可以通过属性声明要使用的函数。知道在 中发现函数时了解一些默认行为很重要。例如,如果你只有一个 Functional bean 中,则通常不需要该属性,因为 中的单个函数可以通过空名称或任何名称来查找。例如,假设这是目录中的唯一函数,则可以将其查找为 , , 。 也就是说,对于您使用框架(例如 Spring Cloud Stream)的情况,最佳实践是最佳实践,建议始终使用 property。spring.cloud.function.definitionFunctionCatalogApplicationContextspring.cloud.function.definitionFunctionCataloguppercasecatalog.lookup(null)catalog.lookup(“”)catalog.lookup(“foo”)spring.cloud.function.definitionspring.cloud.function.definitionspring-doc.cn

spring.cloud.function.definition=uppercase

筛选不合格的函数

典型的 Application Context 可能包括作为有效 java 函数的 bean,但不是要注册的候选 bean。 这样的 bean 可以是来自其他项目的自动配置,也可以是符合成为 Java 函数条件的任何其他 bean。 该框架提供了已知 bean 的默认过滤,这些 bean 不应该成为向函数 catalog 注册的候选 bean。 您还可以通过使用 property 提供逗号分隔的 bean 定义名称列表来向此列表添加其他 bean。FunctionCatalogspring.cloud.function.ineligible-definitionsspring-doc.cn

spring.cloud.function.ineligible-definitions=foo,bar

供应商

供应商可以是反应性的,也可以是命令性的。从调用的角度来看,这应该没有区别 向此类供应商的实施者披露。但是,在框架内使用 (例如,Spring Cloud Stream)、供应商,尤其是反应式的、 通常用于表示流的源,因此它们被调用一次以获取流(例如,Flux) 使用者可以订阅的。换句话说,这些供应商相当于无限流。 但是,相同的反应式 suppliers 也可以表示有限流(例如,轮询的 JDBC 数据上的结果集)。 在这些情况下,这种反应式供应商必须连接到底层框架的某种轮询机制。Supplier<Flux<T>>Supplier<T>spring-doc.cn

为了帮助实现这一点,Spring Cloud Function 提供了一个标记 Comments,以表明此类供应商生成了一个 finite 流,可能需要再次轮询。也就是说,了解 Spring Cloud Function 本身是很重要的 不为此批注提供任何行为。org.springframework.cloud.function.context.PollableBeanspring-doc.cn

此外,annotation 公开了一个 splittable 属性,以发出生成的流的信号 需要拆分(参见 Splitter EIPPollableBean)spring-doc.cn

示例如下:spring-doc.cn

@PollableBean(splittable = true)
public Supplier<Flux<String>> someSupplier() {
	return () -> {
		String v1 = String.valueOf(System.nanoTime());
		String v2 = String.valueOf(System.nanoTime());
		String v3 = String.valueOf(System.nanoTime());
		return Flux.just(v1, v2, v3);
	};
}

功能

Function 也可以以命令式或响应式方式编写,但与 Supplier 和 Consumer 不同的是,有 实现者无需特别考虑,只需了解在框架中使用时 比如 Spring Cloud Stream 等,响应式函数是 仅调用一次以将引用传递给流(即 Flux 或 Mono),并且每个事件调用一次 imperative。spring-doc.cn

public Function<String, String> uppercase() {
    . . . .
}

双函数

如果您需要通过有效负载接收一些额外的数据(元数据),您始终可以将函数 signature 接收一个 Message,其中包含包含此类附加信息的 Headers 映射。spring-doc.cn

public Function<Message<String>, String> uppercase() {
    . . . .
}

为了让你的函数签名更轻、更 POJO,还有另一种方法。您可以使用 .BiFunctionspring-doc.cn

public BiFunction<String, Map, String> uppercase() {
    . . . .
}

鉴于 a 仅包含两个属性(payload 和 headers)并且需要两个输入参数,框架将自动识别此范式,并从中提取 payload,将其作为第一个参数传递,将 headers 映射作为第二个参数传递。 在这种情况下,你的函数也没有与 Spring 的消息传递 API 耦合。 请记住,BiFunction 需要严格的签名,其中第二个参数必须是 Map。 相同的规则也适用于 .MessageBiFunctionMessageBiConsumerspring-doc.cn

消费者

Consumer 有点特殊,因为它有一个 return 类型, 这意味着阻塞,至少是潜在的。你很可能不会 需要编写 ,但如果你确实需要这样做, 记得订阅 input flux。voidConsumer<Flux<?>>spring-doc.cn

功能组成

函数组合是一项功能,允许将多个函数组合成一个。 核心支持基于自 Java 8 以来提供的 Function.andThen(..) 支持中提供的函数组合功能。但是,最重要的是,我们提供了一些附加功能。spring-doc.cn

声明式函数组合

此功能允许您使用 (管道) 或 (逗号) 分隔符以声明性方式提供组合指令 在提供财产时。|,spring.cloud.function.definitionspring-doc.cn

--spring.cloud.function.definition=uppercase|reverse

在这里,我们有效地提供了一个函数的定义,它本身是 函数和函数 .事实上,这就是属性名称是 definition 而不是 name 的原因之一。 因为函数的定义可以是几个命名函数的组合。 如前所述,您可以使用 pipe 代替 (例如 )。uppercasereverse,…​definition=uppercase,reversespring-doc.cn

编写非函数

Spring Cloud Function 还支持将 Supplier 与 or 以及 . 这里重要的是理解这些定义的最终结果。 使用 Function 组合 Supplier 仍然会产生 Supplier,而将 Supplier 与 Consumer 组合将有效地呈现 Runnable。 遵循相同的逻辑组合 Function 和 Consumer 将产生 Consumer。ConsumerFunctionFunctionConsumerspring-doc.cn

当然,你不能组合不可组合的,比如 Consumer 和 Function、Consumer 和 Supplier 等。spring-doc.cn

函数路由和筛选

从 2.2 版本开始,Spring Cloud Function 提供了路由功能,允许 you 调用单个函数,该函数充当您希望调用的实际函数的路由器。 此功能在某些维护配置的 FAAS 环境中非常有用 for several 函数可能很麻烦,或者无法公开多个函数。spring-doc.cn

FunctionCatalog 中以名称 .为了简单 和 consistency 也可以参考 constant。RoutingFunctionfunctionRouterRoutingFunction.FUNCTION_NAMEspring-doc.cn

此函数具有以下签名:spring-doc.cn

public class RoutingFunction implements Function<Object, Object> {
. . .
}

路由指令可以通过多种方式进行通信。我们支持通过 Message headers、System 属性以及可插拔策略。那么让我们看看一些细节spring-doc.cn

MessageRoutingCallback 回调

这是一种帮助确定 route-to 函数定义名称的策略。MessageRoutingCallbackspring-doc.cn

public interface MessageRoutingCallback {
    default String routingResult(Message<?> message) {
	    return (String) message.getHeaders().get(FunctionProperties.FUNCTION_DEFINITION);
    }
}

您需要做的就是实现并将其注册为 bean 以供 . 例如:RoutingFunctionspring-doc.cn

@Bean
public MessageRoutingCallback customRouter() {
	return new MessageRoutingCallback() {
		@Override
		public String routingResult(Message<?> message) {
			return (String) message.getHeaders().get(FunctionProperties.FUNCTION_DEFINITION);
		}
	};
}

在前面的示例中,你可以看到一个非常简单的实现,它从传入 Message 的 Message 头中确定函数定义,并返回表示要调用的函数定义的实例。MessageRoutingCallbackFunctionProperties.FUNCTION_DEFINITIONStringspring-doc.cn

消息报头spring-doc.cn

如果 input 参数是 type ,则可以通过设置 或 Message headers 之一来传达路由指令。 正如该属性的名称所暗示的那样,依赖于 Spring 表达式语言 (SpEL)。 对于更多静态情况,您可以使用 header,它允许您提供 单个函数的名称(例如 )或组合指令的名称(例如 )。 对于更多动态情况,您可以使用 header 并提供应解析的 SpEL 表达式 转换为函数的定义(如上所述)。Message<?>spring.cloud.function.definitionspring.cloud.function.routing-expressionspring.cloud.function.routing-expressionspring.cloud.function.definition…​definition=foo…​definition=foo|bar|bazspring.cloud.function.routing-expressionspring-doc.cn

SPEL 评估上下文的根对象是 actual input 参数,因此在 You can 构造具有 Access 同时设置为 and(例如,)。Message<?>payloadheadersspring.cloud.function.routing-expression=headers.function_name
SPEL 允许用户提供要执行的 Java 代码的字符串表示。鉴于可以通过 Message headers 提供,这意味着设置此类表达式的能力可能会暴露给最终用户(即使用 Web 模块时的 HTTP Headers),这可能会导致一些问题(例如,恶意代码)。为了管理这一点,所有通过 Message 标头的表达式将仅根据其功能有限且设计为仅评估上下文对象(在我们的例子中为 Message)。另一方面,通过 property 或 system variable 设置的所有表达式都根据 进行计算,这允许 Java 语言具有完全的灵活性。 虽然通过系统/应用程序属性或环境变量设置表达式通常被认为是安全的,因为它在正常情况下不会向最终用户公开,但在某些情况下,可见性以及更新系统、应用程序和环境变量的能力确实通过某些 Spring 项目或第三方提供的 Spring Boot Actuator 端点或最终用户的自定义实现向最终用户公开。必须使用行业标准的 Web 安全实践来保护此类终端节点。 Spring Cloud Function 不会公开任何此类端点。spring.cloud.function.routing-expressionSimpleEvaluationContextStandardEvaluationContext

在特定的执行环境/模型中,适配器负责转换和通信和/或通过 Message header。 例如,当使用 spring-cloud-function-web 时,您可以作为 HTTP header 的 Header,框架会将其以及其他 HTTP 头作为 Message 头传播。spring.cloud.function.definitionspring.cloud.function.routing-expressionspring.cloud.function.definitionspring-doc.cn

应用程序属性spring-doc.cn

路由指令也可以通过应用程序属性或作为应用程序属性进行通信。在 上一节也适用于此处。唯一的区别是你以 应用程序属性(例如 )。spring.cloud.function.definitionspring.cloud.function.routing-expression--spring.cloud.function.definition=foospring-doc.cn

重要的是要了解提供 或 as Message 标头仅适用于命令式函数(例如,)。 也就是说,我们只能使用命令式函数路由每条消息。使用响应式函数,我们不能路由每条消息。因此,您只能将路由指令作为 Application Properties 提供。 这一切都与工作单元有关。在命令式功能中,工作单元是 Message,因此我们可以基于这样的工作单元进行路由。 使用 reactive function,work unit-of-work 是整个流,因此我们将只对 application 提供的指令进行操作 属性并路由整个流。spring.cloud.function.definitionspring.cloud.function.routing-expressionFunction<Foo, Bar>

路由指令的优先级顺序spring-doc.cn

鉴于我们有多种提供路由指令的机制,因此了解 在同时使用多个机制的情况下解决冲突,因此顺序如下:spring-doc.cn

  1. MessageRoutingCallback(如果 function 是 imperative 的,则无论是否定义了其他任何内容,都将接管)spring-doc.cn

  2. 消息标头 (如果 function 是命令式且未提供)MessageRoutingCallbackspring-doc.cn

  3. 应用程序属性(任何功能)spring-doc.cn

无法路由的消息spring-doc.cn

如果 route-to 函数在 catalog 中不可用,您将收到一个异常,指出该spring-doc.cn

在某些情况下,这种行为是不可取的,你可能希望有一些 “catch-all” 类型的函数来处理这样的消息。 为了实现这一目标,框架提供了策略。您需要做的就是将其注册为 bean。 它的默认实现将仅记录消息不可路由的事实,但将允许消息流在没有异常的情况下继续进行,从而有效地删除不可路由的消息。 如果你想要更复杂的东西,你需要做的就是提供你自己的这个策略的实现,并将其注册为 bean。org.springframework.cloud.function.context.DefaultMessageRoutingHandlerspring-doc.cn

@Bean
public DefaultMessageRoutingHandler defaultRoutingHandler() {
	return new DefaultMessageRoutingHandler() {
		@Override
		public void accept(Message<?> message) {
			// do something really cool
		}
	};
}

函数过滤

过滤是一种只有两条路径的路由类型 - “go” 或 “discard”。就功能而言,它的意思是 你只想在某个条件返回 'true' 时调用某个函数,否则你想要丢弃输入。 但是,当涉及到丢弃 input 时,对于它在应用程序上下文中的含义,有许多解释。 例如,您可能希望记录它,或者您可能希望维护丢弃消息的计数器。您可能也不想什么都不做。 由于这些路径不同,我们不提供如何处理丢弃消息的通用配置选项。 相反,我们只建议定义一个简单的 Consumer,它表示 'discard' 路径:spring-doc.cn

@Bean
public Consumer<?> devNull() {
   // log, count or whatever
}

现在,您可以让实际上只有两条路径的路由表达式有效地成为过滤器。例如:spring-doc.cn

--spring.cloud.function.routing-expression=headers.contentType.toString().equals('text/plain') ? 'echo' : 'devNull'

每条不符合 'echo' 函数条件的消息都会进入 'devNull',在那里你什么都不能做。 签名还将确保不会尝试类型转换,从而几乎不会产生执行开销。Consumer<?>spring-doc.cn

在处理响应式 Input(例如 Publisher)时,路由指令只能通过 Function 属性提供。这是 由于响应式函数的性质,这些函数只调用一次来传递 Publisher 和其余的 由 Reactor 处理,因此我们无法访问和/或依赖通过单个 值(例如 Message )。

多个路由器

默认情况下,框架将始终配置一个路由函数,如前面部分所述。但是,有时您可能需要多个路由功能。 在这种情况下,除了现有的 bean 实例之外,您还可以创建自己的 bean 实例,只要您为其指定的名称不是 。RoutingFunctionfunctionRouterspring-doc.cn

您可以在映射中将 or 作为键/值对传递给 RoutinFunction。spring.cloud.function.routing-expressionspring.cloud.function.definitionspring-doc.cn

这是一个简单的例子spring-doc.cn

@Configuration
protected static class MultipleRouterConfiguration {

	@Bean
	RoutingFunction mySpecialRouter(FunctionCatalog functionCatalog, BeanFactory beanFactory, @Nullable MessageRoutingCallback routingCallback) {
		Map<String, String> propertiesMap = new HashMap<>();
		propertiesMap.put(FunctionProperties.PREFIX + ".routing-expression", "'reverse'");
		return new RoutingFunction(functionCatalog, propertiesMap, new BeanFactoryResolver(beanFactory), routingCallback);
	}

	@Bean
	public Function<String, String> reverse() {
		return v -> new StringBuilder(v).reverse().toString();
	}

	@Bean
	public Function<String, String> uppercase() {
		return String::toUpperCase;
	}
}

以及演示其工作原理的测试spring-doc.cn

@Test
public void testMultipleRouters() {
	System.setProperty(FunctionProperties.PREFIX + ".routing-expression", "'uppercase'");
	FunctionCatalog functionCatalog = this.configureCatalog(MultipleRouterConfiguration.class);
	Function function = functionCatalog.lookup(RoutingFunction.FUNCTION_NAME);
	assertThat(function).isNotNull();
	Message<String> message = MessageBuilder.withPayload("hello").build();
	assertThat(function.apply(message)).isEqualTo("HELLO");

	function = functionCatalog.lookup("mySpecialRouter");
	assertThat(function).isNotNull();
	message = MessageBuilder.withPayload("hello").build();
	assertThat(function.apply(message)).isEqualTo("olleh");
}

[[输入/输出丰富]] == 输入/输出丰富spring-doc.cn

很多时候,你需要修改或优化传入或传出的 Message,并保持你的代码没有非功能性问题。您不想在业务逻辑中执行此操作。spring-doc.cn

您始终可以通过 Function Composition 来完成它。这种方法有几个好处:spring-doc.cn

  • 它允许您将这个非功能性关注点隔离到一个单独的函数中,您可以将该函数与业务函数作为函数定义进行组合。spring-doc.cn

  • 它为你提供了完全的自由(和危险),让你知道在传入消息到达实际业务功能之前你可以修改什么。spring-doc.cn

@Bean
public Function<Message<?>, Message<?>> enrich() {
    return message -> MessageBuilder.fromMessage(message).setHeader("foo", "bar").build();
}

@Bean
public Function<Message<?>, Message<?>> myBusinessFunction() {
    // do whatever
}

然后通过提供以下函数定义来编写您的函数。enrich|myBusinessFunctionspring-doc.cn

虽然所描述的方法是最灵活的,但它也是最复杂的,因为它需要您编写一些代码,使其成为 bean 或 手动将其注册为函数,然后才能将其与业务函数组合,如前面的示例所示。spring-doc.cn

但是,如果您尝试进行的修改 (扩充) 像前面的示例中一样微不足道,该怎么办?有没有一个更简单、更动态和可配置的 机制来实现相同的目标?spring-doc.cn

从版本 3.1.3 开始,该框架允许您提供 SPEL 表达式来丰富进入函数的 Importing 和 以及从中输出。让我们以其中一个测试为例。spring-doc.cn

@Test
public void testMixedInputOutputHeaderMapping() throws Exception {
	try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
			SampleFunctionConfiguration.class).web(WebApplicationType.NONE).run(
					"--logging.level.org.springframework.cloud.function=DEBUG",
					"--spring.main.lazy-initialization=true",
					"--spring.cloud.function.configuration.split.output-header-mapping-expression.keyOut1='hello1'",
					"--spring.cloud.function.configuration.split.output-header-mapping-expression.keyOut2=headers.contentType",
					"--spring.cloud.function.configuration.split.input-header-mapping-expression.key1=headers.path.split('/')[0]",
					"--spring.cloud.function.configuration.split.input-header-mapping-expression.key2=headers.path.split('/')[1]",
					"--spring.cloud.function.configuration.split.input-header-mapping-expression.key3=headers.path")) {

		FunctionCatalog functionCatalog = context.getBean(FunctionCatalog.class);
		FunctionInvocationWrapper function = functionCatalog.lookup("split");
		Message<byte[]> result = (Message<byte[]>) function.apply(MessageBuilder.withPayload("helo")
				.setHeader(MessageHeaders.CONTENT_TYPE, "application/json")
				.setHeader("path", "foo/bar/baz").build());
		assertThat(result.getHeaders().containsKey("keyOut1")).isTrue();
		assertThat(result.getHeaders().get("keyOut1")).isEqualTo("hello1");
		assertThat(result.getHeaders().containsKey("keyOut2")).isTrue();
		assertThat(result.getHeaders().get("keyOut2")).isEqualTo("application/json");
	}
}

在这里,您会看到一个名为 properties 的属性,前面是函数的名称(即 ),后跟要设置的消息头键的名称和值 SPEL 表达式。第一个表达式(对于 'keyOut1')是用单引号括起来的文字 SpEL 表达式,有效地将 'keyOut1' 设置为 value 。设置为现有 'contentType' 标头的值。input-header-mapping-expressionoutput-header-mapping-expressionsplithello1keyOut2spring-doc.cn

你还可以在 input Headers 映射中观察到一些有趣的功能,我们实际上拆分了现有 Headers 'path' 的值,将 key1 和 key2 的单个值设置为基于索引的拆分元素的值。spring-doc.cn

如果由于某种原因提供的表达式计算失败,则函数的执行将继续进行,就像什么都没发生一样。 但是,您会在日志中看到 WARN 消息,通知您
o.s.c.f.context.catalog.InputEnricher    : Failed while evaluating expression "hello1"  on incoming message. . .

如果您正在处理具有多个输入的函数(下一节),则可以在input-header-mapping-expressionspring-doc.cn

--spring.cloud.function.configuration.echo.input-header-mapping-expression[0].key1=‘hello1'
--spring.cloud.function.configuration.echo.input-header-mapping-expression[1].key2='hello2'

函数 Arity

有时需要对数据流进行分类和组织。例如 考虑一个经典的大数据用例,即处理无组织的数据,比如说, 'orders' 和 'invoices',并且您希望每个都进入单独的数据存储。 这就是函数 arity(具有多个输入和输出的函数)支持的地方 来玩。spring-doc.cn

让我们看一个这样函数的示例(完整的实现细节在这里提供),spring-doc.cn

@Bean
public Function<Flux<Integer>, Tuple2<Flux<String>, Flux<String>>> organise() {
	return flux -> ...;
}

鉴于 Project Reactor 是 SCF 的核心依赖项,我们正在使用它的 Tuple 库。 元组通过向我们传达基数类型信息,为我们提供了独特的优势。 在 SCSt 的背景下,两者都非常重要。Cardinality 让我们知道 需要创建多少个输入和输出绑定,并将其绑定到对应的 函数的输入和输出。了解类型信息可确保正确的类型 转换。spring-doc.cn

此外,这是绑定命名约定的 'index' 部分 names 开始发挥作用,因为在这个函数中,两个输出绑定 名称是 和 。organise-out-0organise-out-1spring-doc.cn

重要提示:目前,函数 arity 仅支持响应式函数 () 以复杂事件处理为中心 其中,对事件汇合的评估和计算通常需要查看 事件流,而不是单个事件。Function<TupleN<Flux<?>…​>, TupleN<Flux<?>…​>>

Input Header 传播

在典型场景中,input Message 头不会传播到输出,这是理所当然的,因为函数的输出可能是其他事物的 input,需要它自己的 Message 头集。 但是,有时可能需要这种传播,因此 Spring Cloud Function 提供了几种机制来实现这一点。spring-doc.cn

首先,您始终可以手动复制 Headers。例如,如果你有一个 Function 的签名是 take 和 return(即 ),你可以简单地有选择地自己复制 headers。请记住,如果你的函数返回 Message,框架不会对它执行任何操作,只会正确转换其有效负载。 但是,这种方法可能有点乏味,尤其是在您只想复制所有标头的情况下。 为了帮助处理此类情况,我们提供了一个简单的属性,该属性允许您在希望传播 input 标头的函数上设置布尔标志。 属性为 。MessageMessageFunction<Message, Message>copy-input-headersspring-doc.cn

例如,假设您有以下配置:spring-doc.cn

@EnableAutoConfiguration
@Configuration
protected static class InputHeaderPropagationConfiguration {

	@Bean
	public Function<String, String> uppercase() {
		return x -> x.toUpperCase();
	}
}

如你所知,你仍然可以通过向它发送 Message 来调用这个函数(框架将负责类型转换和有效负载提取)spring-doc.cn

通过简单地设置为 ,以下断言也将为 truespring.cloud.function.configuration.uppercase.copy-input-headerstruespring-doc.cn

Function<Message<String>, Message<byte[]>> uppercase = catalog.lookup("uppercase", "application/json");
Message<byte[]> result = uppercase.apply(MessageBuilder.withPayload("bob").setHeader("foo", "bar").build());
assertThat(result.getHeaders()).containsKey("foo");

类型转换(Content-Type 协商)

Content-Type 协商是 Spring Cloud Function 的核心功能之一,因为它不仅允许将传入数据转换为声明的类型 通过函数签名,但在函数组合期间执行相同的转换,使原本不可组合(按类型)的函数可组合。spring-doc.cn

为了更好地理解内容类型协商背后的机制和必要性,我们看了一个非常简单的用例,如下所示: 以以下函数为例:spring-doc.cn

@Bean
public Function<Person, String> personFunction {..}

前面示例中显示的函数需要一个对象作为参数,并生成一个 String 类型作为输出。如果此类函数是 调用,则 all 工作正常。但通常函数扮演着传入数据的处理程序的角色,这些数据通常来自 以 Raw 格式(如 等)进行。为了使框架成功地将传入数据作为参数传递给 这个函数,它必须以某种方式将传入的数据转换为类型。PersonPersonbyte[]JSON StringPersonspring-doc.cn

Spring Cloud Function 依赖于 Spring 的两种原生机制来实现这一点。spring-doc.cn

  1. MessageConverter - 将传入的 Message 数据转换为函数声明的类型。spring-doc.cn

  2. ConversionService - 将传入的非 Message 数据转换为函数声明的类型。spring-doc.cn

这意味着根据原始数据类型(Message 或非 Message),Spring Cloud Function 将应用一种或另一种机制。spring-doc.cn

在大多数情况下,在处理作为其他请求(例如,HTTP、消息等)的一部分调用的函数时,框架依赖于 因为此类请求已转换为 Spring 。换句话说,框架会查找并应用适当的 . 为此,框架需要用户提供一些说明。这些指令之一已经由函数的签名提供 本身(Person 类型)。因此,从理论上讲,这应该(而且在某些情况下是)足够了。但是,对于大多数使用案例,为了 选择适当的 ,框架需要一条附加信息。缺失的部分是标题。MessageConvertersMessageMessageConverterMessageConvertercontentTypespring-doc.cn

这样的 Headers 通常作为 Message 的一部分,由首先创建此类 Message 的相应适配器注入。 例如,HTTP POST 请求会将其内容类型的 HTTP 标头复制到 Message 的标头。contentTypespring-doc.cn

对于不存在此类标头的情况,框架依赖于默认内容类型,如 .application/jsonspring-doc.cn

内容类型与参数类型

如前所述,要使框架选择适当的 ,它需要参数类型和内容类型信息(可选)。 选择适当 Parser 的逻辑驻留在参数 resolvers 中,该 resolvers 在调用用户定义的 function(即框架知道实际参数类型时)。 如果参数类型与当前有效负载的类型不匹配,则框架会将 预配置以查看它们中的任何一个是否可以转换有效负载。MessageConverterMessageConverterMessageConvertersspring-doc.cn

和 argument type 的组合是框架通过定位来确定 message 是否可以转换为目标类型的机制 适当的 . 如果未找到合适的,则会引发异常,您可以通过添加自定义来处理该异常(请参阅 )。contentTypeMessageConverterMessageConverterMessageConverterUser-defined Message Convertersspring-doc.cn

不要指望仅根据 . 请记住,这是对 target type 的补充。 这是一个暗示,可能会也可能不会考虑。MessagecontentTypecontentTypeMessageConverter

消息转换器

MessageConverters定义两个方法:spring-doc.cn

Object fromMessage(Message<?> message, Class<?> targetClass);

Message<?> toMessage(Object payload, @Nullable MessageHeaders headers);

了解这些方法的 Contract 及其用法非常重要,特别是在 Spring Cloud Stream 的上下文中。spring-doc.cn

该方法将 incoming 转换为 argument 类型。 的有效负载可以是任何类型,并且是 直到实际实现 以支持多种类型。fromMessageMessageMessageMessageConverterspring-doc.cn

提供的 MessageConverters

如前所述,该框架已经提供了一个堆栈来处理大多数常见的用例。 以下列表按优先顺序描述了提供的 (使用第一个有效的):MessageConvertersMessageConvertersMessageConverterspring-doc.cn

  1. JsonMessageConverter:支持在使用 Jackson (DEFAULT) 或 Gson 库的情况下转换往返 POJO 的有效负载。此消息转换器还知道参数(例如,application/json;type=foo.bar.Person 的 Person)。这在开发函数时类型可能未知的情况下非常有用,因此函数签名可能类似于 or 或 。换句话说,对于类型转换,我们通常从函数签名中派生类型。具有 mime-type 参数允许您以更动态的方式传达类型。MessagecontentTypeapplication/jsontypeFunction<?, ?>FunctionFunction<Object, Object>spring-doc.cn

  2. ByteArrayMessageConverter:支持将 from 的有效负载转换为 if 为 的情况。它本质上是一种传递,主要是为了向后兼容。Messagebyte[]byte[]contentTypeapplication/octet-streamspring-doc.cn

  3. StringMessageConverter:支持将任何类型的 when is 转换为 .StringcontentTypetext/plainspring-doc.cn

当找不到合适的转换器时,框架会引发异常。发生这种情况时,您应该检查您的代码和配置,并确保这样做了 不要错过任何内容(即,确保您使用 Binding 或 Header) 提供了 A。 但是,最有可能的是,您发现了一些不常见的情况(例如可能是自定义),并且当前提供的堆栈不知道如何转换。如果是这种情况,您可以添加 custom .请参见用户定义的消息转换器contentTypecontentTypeMessageConvertersMessageConverterspring-doc.cn

用户定义的消息转换器

Spring Cloud Function 公开了一种机制来定义和注册 additional 。 要使用它,请实施 ,将其配置为 . 然后将其附加到现有的 'MessageConverter' 堆栈中。MessageConvertersorg.springframework.messaging.converter.MessageConverter@Beanspring-doc.cn

重要的是要了解 custom implementations 被添加到现有堆栈的头部。 因此,自定义 implementations 优先于现有 implementations,这使您可以覆盖和添加现有 converters。MessageConverterMessageConverter

以下示例显示如何创建消息转换器 bean 以支持名为:application/barspring-doc.cn

@SpringBootApplication
public static class SinkApplication {

    ...

    @Bean
    public MessageConverter customMessageConverter() {
        return new MyCustomMessageConverter();
    }
}

public class MyCustomMessageConverter extends AbstractMessageConverter {

    public MyCustomMessageConverter() {
        super(new MimeType("application", "bar"));
    }

    @Override
    protected boolean supports(Class<?> clazz) {
        return (Bar.class.equals(clazz));
    }

    @Override
    protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
        Object payload = message.getPayload();
        return (payload instanceof Bar ? payload : new Bar((byte[]) payload));
    }
}

有关 JSON 选项的说明

在 Spring Cloud Function 中,我们支持 Jackson 和 Gson 机制来处理 JSON。 为了您的利益,已经抽象了它,在它下,它自己知道两种机制,并将使用选定的一种 由您或遵循默认规则。 默认规则如下:org.springframework.cloud.function.json.JsonMapperspring-doc.cn

  • 无论 Classpath 上的哪个库都是要使用的机制。因此,如果你必须进入 classpath,将使用 Jackson,如果你有,那么将使用 Gson。com.fasterxml.jackson.*com.google.code.gsonspring-doc.cn

  • 如果两者都有,则 Gson 将是默认值,或者您可以使用以下两个值之一设置 property:或 。spring.cloud.function.preferred-json-mappergsonjacksonspring-doc.cn

也就是说,类型转换通常对开发人员是透明的,但是考虑到它也注册为 bean 如果需要,您可以轻松地将其注入到您的代码中。org.springframework.cloud.function.json.JsonMapperspring-doc.cn

Kotlin Lambda 支持

我们还提供对 Kotlin lambda 的支持(自 v2.0 起)。 请考虑以下事项:spring-doc.cn

@Bean
open fun kotlinSupplier(): () -> String {
    return  { "Hello from Kotlin" }
}

@Bean
open fun kotlinFunction(): (String) -> String {
    return  { it.toUpperCase() }
}

@Bean
open fun kotlinConsumer(): (String) -> Unit {
    return  { println(it) }
}

以上表示配置为 Spring bean 的 Kotlin lambda。每个签名都映射到 Java 等效的 、 和 ,因此框架支持/识别签名。 虽然 Kotlin 到 Java 映射的机制不在本文档的讨论范围之内,但请务必了解 此处也适用“Java 8 函数支持”部分中概述的签名转换规则。SupplierFunctionConsumerspring-doc.cn

要启用 Kotlin 支持,您只需在类路径上添加 Kotlin SDK 库,这将触发适当的 autoconfiguration 和支持类。spring-doc.cn

功能组件扫描

Spring Cloud Function 将扫描 的实现,并在名为 (如果存在) 的包中查找。使用这个 功能,您可以编写不依赖于 Spring 的函数 - 甚至不需要 Annotation。如果您想使用不同的 package 中,您可以设置 .您还可以使用 完全关闭扫描。FunctionConsumerSupplierfunctions@Componentspring.cloud.function.scan.packagesspring.cloud.function.scan.enabled=falsespring-doc.cn

数据掩码

典型的应用程序带有多个级别的日志记录。某些云/无服务器平台可能会在记录的数据包中包含敏感数据,以供所有人查看。 虽然检查正在记录的数据是单个开发人员的责任,但日志记录来自框架本身,因此从版本 4.1 开始,我们引入了最初帮助屏蔽 AWS Lambda 负载中的敏感数据。但是,它是通用的,可用于任何模块。目前,它仅适用于结构化数据,例如 JSON。您只需指定要屏蔽的键,其余的交给它。 应在 file 中指定 keys 。文件格式非常简单,您可以用逗号或换行符或两者分隔多个键。JsonMaskerJsonMaskerMETA-INF/mask.keysspring-doc.cn

以下是此类文件内容的示例:spring-doc.cn

eventSourceARN
asdf1, SS

在这里,您会看到定义了三个键 一旦存在这样的文件,JsonMasker 将使用它来屏蔽指定键的值。spring-doc.cn

下面是显示用法的示例代码spring-doc.cn

private final static JsonMasker masker = JsonMasker.INSTANCE();
. . .

logger.info("Received: " + masker.mask(new String(payload, StandardCharsets.UTF_8)));