您可以通过简单地编写函数并将它们公开为 s 来编写 Spring Cloud Stream 应用程序。
您还可以使用基于 Spring Integration 注解的配置或
基于Spring Cloud Stream注解的配置,虽然从spring-cloud-stream 3.x开始
我们建议使用函数式实现。@Bean
Spring Cloud Function 支持
概述
从 Spring Cloud Stream v2.1 开始,定义流处理程序和源的另一种选择是使用内置
支持 Spring Cloud 函数,其中它们可以表示为 bean
类型。java.util.function.[Supplier/Function/Consumer]
要指定要绑定到绑定公开的外部目标的功能 Bean,
您必须提供财产。spring.cloud.function.definition
如果您只有 type 的单个 bean,则可以
跳过该属性,因为此类功能 Bean 将被自动发现。然而
使用此类属性以避免任何混淆被认为是最佳做法。
有时,这种自动发现可能会妨碍,因为单个 bean 类型可能用于处理消息以外的目的,但是作为单个 bean,它是自动发现和自动绑定的。
对于这些罕见的情况,可以通过提供值设置为 的属性来禁用自动发现。java.util.function.[Supplier/Function/Consumer] spring.cloud.function.definition java.util.function.[Supplier/Function/Consumer] spring.cloud.stream.function.autodetect false |
下面是应用程序公开消息处理程序的示例,该消息处理程序通过充当数据的使用者和生产者来有效地支持传递语义。java.util.function.Function
@SpringBootApplication
public class MyFunctionBootApp {
public static void main(String[] args) {
SpringApplication.run(MyFunctionBootApp.class);
}
@Bean
public Function<String, String> toUpperCase() {
return s -> s.toUpperCase();
}
}
在前面的示例中,我们定义了一个名为 toUpperCase 类型的 Bean 作为消息处理程序
其“输入”和“输出”必须绑定到由提供的目标绑定器公开的外部目标。
默认情况下,“input”和“output”绑定名称将为 和 。
有关用于建立绑定名称的命名约定的详细信息,请参阅 [功能绑定名称] 部分。java.util.function.Function
toUpperCase-in-0
toUpperCase-out-0
以下是支持其他语义的简单功能应用程序示例:
下面是公开为java.util.function.Supplier
@SpringBootApplication
public static class SourceFromSupplier {
@Bean
public Supplier<Date> date() {
return () -> new Date(12345L);
}
}
下面是公开为java.util.function.Consumer
@SpringBootApplication
public static class SinkFromConsumer {
@Bean
public Consumer<String> sink() {
return System.out::println;
}
}
供应商(来源)
Function
并且在如何触发调用时非常简单。它们是基于触发的
在发送到它们绑定到的目标的数据(事件)上。换句话说,它们是经典的事件驱动组件。Consumer
但是,在触发方面属于自己的类别。由于根据定义,它是数据的来源(来源),因此它不是
订阅任何入站目标,因此必须由其他机制触发。
还有一个执行问题,可以是命令性的,也可以是被动的,直接关系到触发这些供应商。Supplier
Supplier
请考虑以下示例:
@SpringBootApplication
public static class SupplierConfiguration {
@Bean
public Supplier<String> stringSupplier() {
return () -> "Hello from Supplier";
}
}
每当调用前面的 Bean 方法时,都会生成一个字符串。但是,谁调用此方法以及多久调用一次?
该框架提供了一个默认的轮询机制(回答“谁?”的问题),该机制将触发供应商的调用,并且默认情况下它将这样做
每秒(回答“多久一次?
换言之,上述配置每秒生成一条消息,并且每条消息都发送到绑定器公开的目标。
若要了解如何自定义轮询机制,请参阅轮询配置属性部分。Supplier
get()
output
请看一个不同的例子:
@SpringBootApplication
public static class SupplierConfiguration {
@Bean
public Supplier<Flux<String>> stringSupplier() {
return () -> Flux.fromStream(Stream.generate(new Supplier<String>() {
@Override
public String get() {
try {
Thread.sleep(1000);
return "Hello from Supplier";
} catch (Exception e) {
// ignore
}
}
})).subscribeOn(Schedulers.elastic()).share();
}
}
前面的 Bean 采用响应式编程风格。通常,与命令性供应商不同,
它应该只触发一次,因为调用其方法会产生(提供)连续的消息流,而不是
个人消息。Supplier
get()
该框架识别编程风格的差异,并保证这样的供应商只被触发一次。
但是,请想象一下,您想要轮询某个数据源并返回表示结果集的有限数据流的用例。 对于这样的供应商来说,反应式编程风格是一个完美的机制。但是,鉴于所生产流的有限性, 此类供应商仍需定期调用。
请考虑以下示例,该示例通过生成有限数据流来模拟此类用例:
@SpringBootApplication
public static class SupplierConfiguration {
@PollableBean
public Supplier<Flux<String>> stringSupplier() {
return () -> Flux.just("hello", "bye");
}
}
Bean 本身是用 annotation (sub-set of ) 来注释的,从而向框架发出信号,虽然实现
这样的供应商是被动的,它仍然需要轮询。PollableBean
@Bean
定义了一个属性,其中向此注释的后处理器发出信号
注释组件生成的结果必须被拆分,并且默认设置为。这意味着
该框架将拆分返回发送每个项目作为单独的消息。如果不是
他想要的行为,你可以把它设置为,此时这样的供应商将简单地返回
在不拆分的情况下生成的 Flux。splittable PollableBean true false |
供应商 & 线程
正如您现在所了解的,与 和 不同,它们是由事件触发的(它们有输入数据),没有
任何输入,因此由不同的机制 - 轮询器触发,它可能具有不可预测的线程机制。虽然细节
线程机制大多数时候与函数的下游执行无关,在某些情况下可能会出现问题
特别是对于可能对线程亲和性有一定期望的集成框架。例如,Spring Cloud Sleuth 依赖于
在跟踪存储在线程本地中的数据时。
对于这些情况,我们通过 有另一种机制,用户可以更好地控制线程机制。您可以获得更多详细信息
在将任意数据发送到输出(例如外部事件驱动的源)部分。Function Consumer Supplier StreamBridge |
消费者(反应式)
Reactive 有点特殊,因为它有一个 void 返回类型,使框架没有可订阅的引用。
最有可能的是,您不需要写入 ,而是将其写为调用运算符,作为流上的最后一个运算符。Consumer
Consumer<Flux<?>>
Function<Flux<?>, Mono<Void>>
then
例如:
public Function<Flux<?>, Mono<Void>> consumer() {
return flux -> flux.map(..).filter(..).then();
}
但是,如果您确实需要编写一个显式 ,请记住订阅传入的 Flux。Consumer<Flux<?>>
此外,请记住,在混合反应式函数和命令式函数时,相同的规则也适用于函数组合。
Spring Cloud Function 确实支持使用命令式组合响应式函数,但是您必须注意某些限制。
例如,假设您已经使用命令式消费者组成了反应式函数。
这种组合的结果是反应性的。但是,没有办法订阅本节前面讨论的此类消费者,
因此,这个限制只能通过让你的消费者做出反应并手动订阅(如前所述)来解决,或者将你的功能改变为命令式的。Consumer
轮询配置属性
Spring Cloud Stream 公开了以下属性,并以 :spring.integration.poller.
- 固定延迟
-
修复了默认轮询器的延迟(以毫秒为单位)。
默认值:1000L。
- maxMessagesPerPoll
-
默认轮询器的每个轮询事件的最大消息数。
默认值:1L。
- 克龙
-
Cron 触发器的 Cron 表达式值。
默认值:none。
- 初始延迟
-
周期性触发的初始延迟。
默认值:0。
- 时间单位
-
应用于延迟值的 TimeUnit。
默认值:MILLISECONDS。
例如,将轮询器间隔设置为每两秒轮询一次。--spring.integration.poller.fixed-delay=2000
每个绑定的轮询配置
上一节演示如何配置将应用于所有绑定的单个默认轮询器。虽然它非常适合微服务 spring-cloud-stream 模型,该模型设计用于每个微服务代表单个组件(例如,Supplier),因此默认轮询器配置就足够了,但存在一些边缘情况 您可能有多个组件需要不同的轮询配置
对于这种情况,请使用每个绑定的方式来配置轮询器。例如,假设您有一个输出绑定。在这种情况下,您可以为此类配置轮询器
使用前缀进行绑定(例如,)。supply-out-0
spring.cloud.stream.bindings.supply-out-0.producer.poller..
spring.cloud.stream.bindings.supply-out-0.producer.poller.fixed-delay=2000
将任意数据发送到输出(例如,外部事件驱动的源)
在某些情况下,实际数据源可能来自不是活页夹的外部(外部)系统。例如, 数据源可能是经典的 REST 终结点。我们如何将这种源与spring-cloud-stream使用的功能机制联系起来?
Spring Cloud Stream 提供了两种机制,让我们更详细地了解它们
在这里,对于这两个示例,我们将使用一个名为 bound to the root web 上下文的标准 MVC 端点方法。
通过 StreamBridge 机制委派传入请求进行流式传输。delegateToSupplier
@SpringBootApplication
@Controller
public class WebSourceApplication {
public static void main(String[] args) {
SpringApplication.run(WebSourceApplication.class, "--spring.cloud.stream.output-bindings=toStream");
}
@Autowired
private StreamBridge streamBridge;
@RequestMapping
@ResponseStatus(HttpStatus.ACCEPTED)
public void delegateToSupplier(@RequestBody String body) {
System.out.println("Sending " + body);
streamBridge.send("toStream", body);
}
}
在这里,我们自动连接一个 bean,它允许我们有效地将数据发送到输出绑定
将非流应用程序与 spring-cloud-stream 桥接。请注意,前面的示例没有任何
定义的源函数(例如,Supplier Bean)使框架没有触发器来提前创建源绑定,这对于配置包含函数 Bean 的情况是典型的。这很好,因为将启动输出绑定的创建(以及
目标自动配置(如有必要),用于在第一次调用其操作时将其缓存为
后续重用(有关详细信息,请参阅 StreamBridge 和动态目标)。StreamBridge
StreamBridge
send(..)
但是,如果要在初始化(启动)时预先创建输出绑定,则可以从可以声明源名称的属性中受益。
提供的名称将用作创建源绑定的触发器。
可用于表示多个源(多个输出绑定)
(例如,spring.cloud.stream.output-bindings
;
--spring.cloud.stream.output-bindings=foo;bar
)
另外,请注意,该方法需要 for 数据。这意味着您可以发送 POJO 或发送到它和它
发送输出时将执行相同的例程,就好像它来自提供相同级别的任何职能部门或供应商一样
与函数的一致性。这意味着输出类型转换、分区等被尊重,就好像它来自函数生成的输出一样。streamBridge.send(..)
Object
Message
具有异步发送的 StreamBridge
StreamBridge
使用 Spring Integration 框架提供的发送机制,该框架是 Spring Cloud Stream 的核心。默认情况下,此机制使用发送方的线程。换句话说,发送是阻塞的。虽然这在许多情况下是可以的,但在某些情况下,您希望此类发送是异步的。为此,请使用 before 调用其中一个 send 方法的方法。setAsync(true)
StreamBridge
可观测性 使用异步发送进行上下文传播
当使用框架提供的可观测性支持以及支持 Spring 框架时,打破线程边界将影响可观测性上下文的一致性,从而影响您的跟踪历史记录。为了避免这种情况,您只需要添加依赖关系 Micrometer(见下文)context-propagation
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>context-propagation</artifactId>
<version>1.1.0</version>
</dependency>
StreamBridge 和动态目标
StreamBridge
也可用于提前不知道输出目的地的情况,类似于用例
在 [Routing FROM Consumer] 一节中描述。
让我们看一下这个例子
@SpringBootApplication
@Controller
public class WebSourceApplication {
public static void main(String[] args) {
SpringApplication.run(WebSourceApplication.class, args);
}
@Autowired
private StreamBridge streamBridge;
@RequestMapping
@ResponseStatus(HttpStatus.ACCEPTED)
public void delegateToSupplier(@RequestBody String body) {
System.out.println("Sending " + body);
streamBridge.send("myDestination", body);
}
}
如您所见,前面的示例与上一个示例非常相似,只是通过属性(未提供)提供的显式绑定指令。
在这里,我们将数据发送到不作为绑定存在的名称。因此,此类名称将被视为动态目标
如[路由 FROM 使用者]一节所述。spring.cloud.stream.output-bindings
myDestination
在前面的示例中,我们使用外部源来馈送流。ApplicationRunner
一个更实际的例子,其中外部源是 REST 端点。
@SpringBootApplication
@Controller
public class WebSourceApplication {
public static void main(String[] args) {
SpringApplication.run(WebSourceApplication.class);
}
@Autowired
private StreamBridge streamBridge;
@RequestMapping
@ResponseStatus(HttpStatus.ACCEPTED)
public void delegateToSupplier(@RequestBody String body) {
streamBridge.send("myBinding", body);
}
}
正如您在方法中看到的那样,我们使用 StreamBridge 将数据发送到绑定。在这里,您还可以从中受益
如果不存在,则将自动创建并缓存 where 的动态功能,否则将使用现有绑定。delegateToSupplier
myBinding
StreamBridge
myBinding
如果存在许多动态目标,缓存动态目标(绑定)可能会导致内存泄漏。要有一定程度的控制权
我们为默认缓存大小为 10 的输出绑定提供自逐出缓存机制。这意味着,如果动态目标大小超过该数字,则可能会逐出现有绑定,因此需要重新创建绑定,这可能会导致轻微的性能下降。您可以通过属性将其设置为所需值来增加缓存大小。spring.cloud.stream.dynamic-destination-cache-size |
curl -H "Content-Type: text/plain" -X POST -d "hello from the other side" http://localhost:8080/
通过展示两个例子,我们想强调这种方法适用于任何类型的外国来源。
如果您使用的是 Solace PubSub+ 活页夹,则 Spring Cloud Stream 保留了标头(可通过 BinderHeaders.TARGET_DESTINATION 检索),它允许将消息从其绑定的配置目标重定向到此标头指定的目标。这允许绑定器管理发布到动态目标所需的资源,从而使框架不必这样做,并避免了上一个注释中提到的缓存问题。更多信息请点击此处。scst_targetDestination |
使用 StreamBridge 输出内容类型
如有必要,您还可以使用以下方法签名提供特定内容类型。
或者,如果您以 ,其内容类型将得到支持。public boolean send(String bindingName, Object data, MimeType outputContentType)
Message
在 StreamBridge 中使用特定的活页夹类型
Spring Cloud Stream 支持多种活页夹方案。例如,您可能正在从 Kafka 接收数据并将其发送到 RabbitMQ。
有关多个活页夹方案的更多信息,请参阅 [活页夹] 部分,特别是 [类路径上的多个活页夹]
如果您计划使用 StreamBridge 并在您的应用程序中配置了多个活页夹,您还必须告诉 StreamBridge
使用哪种粘合剂。为此,还有两种方法的变体:send
public boolean send(String bindingName, @Nullable String binderType, Object data)
public boolean send(String bindingName, @Nullable String binderType, Object data, MimeType outputContentType)
如您所见,您可以提供一个附加参数 - ,告诉 BindingService 在创建动态绑定时使用哪个绑定器。binderType
对于使用属性或已在不同绑定程序下创建绑定的情况,该参数将不起作用。spring.cloud.stream.output-bindings binderType |
将信道拦截器与 StreamBridge 配合使用
由于使用 a 建立输出绑定,因此可以在通过 发送数据时激活信道拦截器。
由应用程序决定在哪个信道上应用拦截器。
Spring Cloud Stream 不会将检测到的所有信道拦截器注入其中,除非它们被 .StreamBridge
MessageChannel
StreamBridge
StreamBridge
StreamBridge
@GlobalChannelInterceptor(patterns = "*")
让我们假设您在应用程序中具有以下两个不同的绑定。StreamBridge
streamBridge.send("foo-out-0", message);
和
streamBridge.send("bar-out-0", message);
现在,如果您希望在两个绑定上应用通道拦截器,那么您可以声明以下 bean。StreamBridge
GlobalChannelInterceptor
@Bean
@GlobalChannelInterceptor(patterns = "*")
public ChannelInterceptor customInterceptor() {
return new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
...
}
};
}
但是,如果您不喜欢上述全局方法,并希望为每个绑定提供专用的拦截器,则可以执行以下操作。
@Bean
@GlobalChannelInterceptor(patterns = "foo-*")
public ChannelInterceptor fooInterceptor() {
return new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
...
}
};
}
和
@Bean
@GlobalChannelInterceptor(patterns = "bar-*")
public ChannelInterceptor barInterceptor() {
return new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
...
}
};
}
您可以灵活地使模式更加严格或根据业务需求进行自定义。
通过这种方法,应用程序能够决定要注入哪些拦截器,而不是应用所有可用的拦截器。StreamBridge
StreamBridge 通过包含 的所有方法的接口提供合约。因此,应用程序可以选择使用 自动接线。当涉及到通过为接口提供模拟或类似机制使用的单元测试代码时,这很方便。StreamOperations send StreamBridge StreamOperations StreamBridge StreamOperations |
Reactive Functions 支持
由于Spring Cloud Function是建立在Project Reactor之上的,因此您不需要做太多事情
在实现 时从响应式编程模型中获益,或者 。Supplier
Function
Consumer
例如:
@SpringBootApplication
public static class SinkFromConsumer {
@Bean
public Function<Flux<String>, Flux<String>> reactiveUpperCase() {
return flux -> flux.map(val -> val.toUpperCase());
}
}
在选择反应式或命令式编程模型时,必须了解一些重要的事情。 完全反应还是只是 API? 使用反应式 API 并不一定意味着您可以从此类 API 的所有反应式功能中受益。换句话说,背压和其他高级功能只有在与兼容系统(例如反应式 Kafka 活页夹)一起使用时才有效。如果您使用的是常规的 Kafka 或 Rabbit 或任何其他非反应式绑定器,则只能从响应式 API 本身的便利性中受益,而不能从其高级功能中受益,因为流的实际源或目标不是响应式的。 错误处理和重试 在本手册中,您将看到有关基于框架的错误处理、重试和其他功能以及与它们相关的配置属性的几个参考。重要的是要了解它们只影响命令式函数,当涉及到反应式函数时,你不应该有同样的期望。这就是原因......
反应式函数和命令式函数之间存在根本区别。
命令式函数是一个消息处理程序,由框架在它收到的每条消息上调用它。因此,对于 N 条消息,将有 N 次调用此类函数,因此我们可以包装此类函数并添加其他功能,例如错误处理、重试等。
反应函数是初始化函数。它只调用一次,以获取对用户提供的 Flux/Mono 的引用,以连接到框架提供的 Flux/Mono。在那之后,我们(框架)完全没有可见性或对流的控制。
因此,对于响应式函数,在错误处理和重试(即 等)方面,您必须依赖响应式 API 的丰富性。 |
功能组成
使用函数式编程模型,您还可以从函数组合中受益,在函数组合中,您可以从一组简单函数动态组合复杂的处理程序。 例如,让我们将以下函数 Bean 添加到上面定义的应用程序中
@Bean
public Function<String, String> wrapInQuotes() {
return s -> "\"" + s + "\"";
}
并修改该属性以反映您打算从“toUpperCase”和“wrapInQuotes”编写新函数的意图。
为此,Spring Cloud 函数依赖于(管道)符号。因此,为了完成我们的示例,我们的属性现在将如下所示:spring.cloud.function.definition
|
--spring.cloud.function.definition=toUpperCase|wrapInQuotes
Spring Cloud Function 提供的功能组合支持的一大好处是 事实上,您可以组合反应式和命令式函数。 |
组合的结果是一个单一的函数,正如您可能猜到的那样,它可能有一个很长且相当隐晦的名称(例如,)
当涉及到其他配置属性时,会带来很多不便。这是 [功能绑定名称] 一节中描述的描述性绑定名称功能可以提供帮助的地方。foo|bar|baz|xyz. . .
例如,如果我们想给我们一个更具描述性的名称,我们可以这样做
具有以下属性允许
引用该绑定名称的其他配置属性(例如,)。toUpperCase|wrapInQuotes
spring.cloud.stream.function.bindings.toUpperCase|wrapInQuotes-in-0=quotedUpperCaseInput
spring.cloud.stream.bindings.quotedUpperCaseInput.destination=myDestination
职能构成和跨领域问题
函数组合有效地允许您通过分解复杂性来解决复杂性 到一组简单且可单独管理/可测试的组件,这些组件仍然可以 在运行时表示为一个。但这并不是唯一的好处。
您还可以使用组合来解决某些横切的非功能性问题, 例如内容丰富。例如,假设您有一条传入消息,该消息可能 缺少某些标头,或者某些标头与您的业务状态不完全相同 功能会期望。现在,您可以实现一个单独的函数来解决这些问题 关注点,然后将其与主要业务功能组成。
让我们看一下这个例子
@SpringBootApplication
public class DemoStreamApplication {
public static void main(String[] args) {
SpringApplication.run(DemoStreamApplication.class,
"--spring.cloud.function.definition=enrich|echo",
"--spring.cloud.stream.function.bindings.enrich|echo-in-0=input",
"--spring.cloud.stream.bindings.input.destination=myDestination",
"--spring.cloud.stream.bindings.input.group=myGroup");
}
@Bean
public Function<Message<String>, Message<String>> enrich() {
return message -> {
Assert.isTrue(!message.getHeaders().containsKey("foo"), "Should NOT contain 'foo' header");
return MessageBuilder.fromMessage(message).setHeader("foo", "bar").build();
};
}
@Bean
public Function<Message<String>, Message<String>> echo() {
return message -> {
Assert.isTrue(message.getHeaders().containsKey("foo"), "Should contain 'foo' header");
System.out.println("Incoming message " + message);
return message;
};
}
}
虽然微不足道,但此示例演示了一个函数如何使用附加标头(非功能性关注点)来丰富传入的消息,
所以另一个功能 - - 可以从中受益。该函数保持干净,仅专注于业务逻辑。
您还可以查看属性的用法,以简化组合绑定名称。echo
echo
spring.cloud.stream.function.bindings
具有多个输入和输出参数的函数
从版本 3.0 开始,spring-cloud-stream 支持以下功能: 具有多个输入和/或多个输出(返回值)。这实际上意味着什么,以及 它针对什么类型的用例?
-
大数据:想象一下,您正在处理的数据源高度无组织,并且包含各种类型的数据元素 (例如,订单、交易等),您实际上需要对其进行整理。
-
数据聚合:另一个用例可能需要您合并来自 2+ 传入_streams的数据元素。
上面仅描述了您可能需要使用单个函数来接受和/或生成的几个用例 多个数据流。这就是我们在这里针对的用例类型。
另外,请注意这里对流概念的强调略有不同。假设这些功能只有价值
如果他们有权访问实际的数据流(而不是单个元素)。因此,我们依靠
由 Project Reactor 提供的抽象(即 和 ),该抽象已经在
classpath 作为 spring-cloud-functions 引入的依赖项的一部分。Flux
Mono
另一个重要方面是多个输入和输出的表示。而 java 提供
各种不同的抽象来表示这些抽象的多个事物
a)无界,b)缺乏arity,c)缺乏类型信息,这在这种情况下都很重要。
举个例子,让我们看一个数组,它只允许我们
描述单个类型的多个或将所有内容转换为 ,影响 的透明类型转换功能
spring-cloud-stream 等。Collection
Object
因此,为了满足所有这些要求,最初的支持依赖于使用另一个抽象的签名 由 Project Reactor - Tuples 提供。但是,我们正在努力允许更灵活的签名。
请参阅 [绑定和绑定名称] 部分,了解用于建立此类应用程序使用的绑定名称的命名约定。 |
让我们看一下几个示例:
@SpringBootApplication
public class SampleApplication {
@Bean
public Function<Tuple2<Flux<String>, Flux<Integer>>, Flux<String>> gather() {
return tuple -> {
Flux<String> stringStream = tuple.getT1();
Flux<String> intStream = tuple.getT2().map(i -> String.valueOf(i));
return Flux.merge(stringStream, intStream);
};
}
}
上面的例子演示了接受两个输入(第一个类型和第二个类型)的函数
并生成 类型的单个输出。String
Integer
String
因此,对于上面的示例,两个输入绑定将是 和 并且为了保持一致性
输出绑定也遵循相同的约定,并命名为 。gather-in-0
gather-in-1
gather-out-0
知道了这一点,你就可以设置绑定特定的属性。
例如,以下内容将覆盖 content-type 进行绑定:gather-in-0
--spring.cloud.stream.bindings.gather-in-0.content-type=text/plain
@SpringBootApplication
public class SampleApplication {
@Bean
public static Function<Flux<Integer>, Tuple2<Flux<String>, Flux<String>>> scatter() {
return flux -> {
Flux<Integer> connectedFlux = flux.publish().autoConnect(2);
UnicastProcessor even = UnicastProcessor.create();
UnicastProcessor odd = UnicastProcessor.create();
Flux<Integer> evenFlux = connectedFlux.filter(number -> number % 2 == 0).doOnNext(number -> even.onNext("EVEN: " + number));
Flux<Integer> oddFlux = connectedFlux.filter(number -> number % 2 != 0).doOnNext(number -> odd.onNext("ODD: " + number));
return Tuples.of(Flux.from(even).doOnSubscribe(x -> evenFlux.subscribe()), Flux.from(odd).doOnSubscribe(x -> oddFlux.subscribe()));
};
}
}
上面的示例与前面的示例有些相反,并演示了以下函数:
接受类型的单个输入并生成两个输出(均为 类型)。Integer
String
因此,对于上面的示例,输入绑定是 和
输出绑定是 和 。scatter-in-0
scatter-out-0
scatter-out-1
然后,您可以使用以下代码对其进行测试:
@Test
public void testSingleInputMultiOutput() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration.getCompleteConfiguration(
SampleApplication.class))
.run("--spring.cloud.function.definition=scatter")) {
InputDestination inputDestination = context.getBean(InputDestination.class);
OutputDestination outputDestination = context.getBean(OutputDestination.class);
for (int i = 0; i < 10; i++) {
inputDestination.send(MessageBuilder.withPayload(String.valueOf(i).getBytes()).build());
}
int counter = 0;
for (int i = 0; i < 5; i++) {
Message<byte[]> even = outputDestination.receive(0, 0);
assertThat(even.getPayload()).isEqualTo(("EVEN: " + String.valueOf(counter++)).getBytes());
Message<byte[]> odd = outputDestination.receive(0, 1);
assertThat(odd.getPayload()).isEqualTo(("ODD: " + String.valueOf(counter++)).getBytes());
}
}
}
单个应用程序中的多种功能
可能还需要在单个应用程序中对多个消息处理程序进行分组。您可以通过以下方式做到这一点 定义多个函数。
@SpringBootApplication
public class SampleApplication {
@Bean
public Function<String, String> uppercase() {
return value -> value.toUpperCase();
}
@Bean
public Function<String, String> reverse() {
return value -> new StringBuilder(value).reverse().toString();
}
}
在上面的示例中,我们有配置,它定义了两个函数和 。
因此,首先,如前所述,我们需要注意到存在冲突(多个函数),因此
我们需要通过提供指向实际函数的属性来解决它
我们想要绑定。除了这里,我们将使用分隔符来指向这两个函数(请参阅下面的测试用例)。uppercase
reverse
spring.cloud.function.definition
;
与具有多个输入/输出的函数一样,请参阅 [绑定和绑定名称] 部分了解命名 用于建立此类应用程序使用的约束性名称的约定。 |
然后,您可以使用以下代码对其进行测试:
@Test
public void testMultipleFunctions() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration.getCompleteConfiguration(
ReactiveFunctionConfiguration.class))
.run("--spring.cloud.function.definition=uppercase;reverse")) {
InputDestination inputDestination = context.getBean(InputDestination.class);
OutputDestination outputDestination = context.getBean(OutputDestination.class);
Message<byte[]> inputMessage = MessageBuilder.withPayload("Hello".getBytes()).build();
inputDestination.send(inputMessage, "uppercase-in-0");
inputDestination.send(inputMessage, "reverse-in-0");
Message<byte[]> outputMessage = outputDestination.receive(0, "uppercase-out-0");
assertThat(outputMessage.getPayload()).isEqualTo("HELLO".getBytes());
outputMessage = outputDestination.receive(0, "reverse-out-1");
assertThat(outputMessage.getPayload()).isEqualTo("olleH".getBytes());
}
}
批量使用者
当使用支持批处理侦听器的 并且为使用者绑定启用了该功能时,可以设置为启用
要传递给函数的整批消息。MessageChannelBinder
spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode
true
List
@Bean
public Function<List<Person>, Person> findFirstPerson() {
return persons -> persons.get(0);
}
批量生产者
您还可以通过返回消息集合在生产者端使用批处理的概念,该集合有效地提供了 相反的效果,集合中的每条消息都将由活页夹单独发送。
请考虑以下函数:
@Bean
public Function<String, List<Message<String>>> batch() {
return p -> {
List<Message<String>> list = new ArrayList<>();
list.add(MessageBuilder.withPayload(p + ":1").build());
list.add(MessageBuilder.withPayload(p + ":2").build());
list.add(MessageBuilder.withPayload(p + ":3").build());
list.add(MessageBuilder.withPayload(p + ":4").build());
return list;
};
}
返回列表中的每条消息将单独发送,从而将四条消息发送到输出目标。
Spring 集成流程作为函数
实现函数时,您可能有适合该类别的复杂要求 企业集成模式 (EIP)。最好使用 Spring Integration(SI)等框架,是EIP的参考实现。
值得庆幸的是,SI 已经通过集成流作为网关提供了将集成流公开为函数的支持,请考虑以下示例:
@SpringBootApplication
public class FunctionSampleSpringIntegrationApplication {
public static void main(String[] args) {
SpringApplication.run(FunctionSampleSpringIntegrationApplication.class, args);
}
@Bean
public IntegrationFlow uppercaseFlow() {
return IntegrationFlows.from(MessageFunction.class, "uppercase")
.<String, String>transform(String::toUpperCase)
.logAndReply(LoggingHandler.Level.WARN);
}
public interface MessageFunction extends Function<Message<String>, Message<String>> {
}
}
对于那些熟悉 SI 的人来说,你可以看到我们定义了一个类型的 bean,其中我们
声明一个集成流,我们希望将其公开为(使用 SI DSL)称为 。
该接口允许我们显式声明输入和输出的类型,以便进行正确的类型转换。
有关类型转换的更多信息,请参见 [内容类型协商] 部分。IntegrationFlow
Function<String, String>
uppercase
MessageFunction
要接收原始输入,可以使用 .from(Function.class, …)
生成的函数绑定到目标绑定器公开的输入和输出目标。
请参阅 [绑定和绑定名称] 部分了解命名 用于建立此类应用程序使用的约束性名称的约定。 |
有关 Spring Integration 和 Spring Cloud Stream 互操作性的更多详细信息,特别是围绕函数式编程模型 您可能会发现这篇文章非常有趣,因为它更深入地探讨了这一点 通过合并 Spring Integration 和 Spring Cloud Stream/Functions 的优点,您可以应用到各种模式中。
如果您只有 type 的单个 bean,则可以
跳过该属性,因为此类功能 Bean 将被自动发现。然而
使用此类属性以避免任何混淆被认为是最佳做法。
有时,这种自动发现可能会妨碍,因为单个 bean 类型可能用于处理消息以外的目的,但是作为单个 bean,它是自动发现和自动绑定的。
对于这些罕见的情况,可以通过提供值设置为 的属性来禁用自动发现。java.util.function.[Supplier/Function/Consumer] spring.cloud.function.definition java.util.function.[Supplier/Function/Consumer] spring.cloud.stream.function.autodetect false |
定义了一个属性,其中向此注释的后处理器发出信号
注释组件生成的结果必须被拆分,并且默认设置为。这意味着
该框架将拆分返回发送每个项目作为单独的消息。如果不是
他想要的行为,你可以把它设置为,此时这样的供应商将简单地返回
在不拆分的情况下生成的 Flux。splittable PollableBean true false |
正如您现在所了解的,与 和 不同,它们是由事件触发的(它们有输入数据),没有
任何输入,因此由不同的机制 - 轮询器触发,它可能具有不可预测的线程机制。虽然细节
线程机制大多数时候与函数的下游执行无关,在某些情况下可能会出现问题
特别是对于可能对线程亲和性有一定期望的集成框架。例如,Spring Cloud Sleuth 依赖于
在跟踪存储在线程本地中的数据时。
对于这些情况,我们通过 有另一种机制,用户可以更好地控制线程机制。您可以获得更多详细信息
在将任意数据发送到输出(例如外部事件驱动的源)部分。Function Consumer Supplier StreamBridge |
如果存在许多动态目标,缓存动态目标(绑定)可能会导致内存泄漏。要有一定程度的控制权
我们为默认缓存大小为 10 的输出绑定提供自逐出缓存机制。这意味着,如果动态目标大小超过该数字,则可能会逐出现有绑定,因此需要重新创建绑定,这可能会导致轻微的性能下降。您可以通过属性将其设置为所需值来增加缓存大小。spring.cloud.stream.dynamic-destination-cache-size |
如果您使用的是 Solace PubSub+ 活页夹,则 Spring Cloud Stream 保留了标头(可通过 BinderHeaders.TARGET_DESTINATION 检索),它允许将消息从其绑定的配置目标重定向到此标头指定的目标。这允许绑定器管理发布到动态目标所需的资源,从而使框架不必这样做,并避免了上一个注释中提到的缓存问题。更多信息请点击此处。scst_targetDestination |
对于使用属性或已在不同绑定程序下创建绑定的情况,该参数将不起作用。spring.cloud.stream.output-bindings binderType |
StreamBridge 通过包含 的所有方法的接口提供合约。因此,应用程序可以选择使用 自动接线。当涉及到通过为接口提供模拟或类似机制使用的单元测试代码时,这很方便。StreamOperations send StreamBridge StreamOperations StreamBridge StreamOperations |
在选择反应式或命令式编程模型时,必须了解一些重要的事情。 完全反应还是只是 API? 使用反应式 API 并不一定意味着您可以从此类 API 的所有反应式功能中受益。换句话说,背压和其他高级功能只有在与兼容系统(例如反应式 Kafka 活页夹)一起使用时才有效。如果您使用的是常规的 Kafka 或 Rabbit 或任何其他非反应式绑定器,则只能从响应式 API 本身的便利性中受益,而不能从其高级功能中受益,因为流的实际源或目标不是响应式的。 错误处理和重试 在本手册中,您将看到有关基于框架的错误处理、重试和其他功能以及与它们相关的配置属性的几个参考。重要的是要了解它们只影响命令式函数,当涉及到反应式函数时,你不应该有同样的期望。这就是原因......
反应式函数和命令式函数之间存在根本区别。
命令式函数是一个消息处理程序,由框架在它收到的每条消息上调用它。因此,对于 N 条消息,将有 N 次调用此类函数,因此我们可以包装此类函数并添加其他功能,例如错误处理、重试等。
反应函数是初始化函数。它只调用一次,以获取对用户提供的 Flux/Mono 的引用,以连接到框架提供的 Flux/Mono。在那之后,我们(框架)完全没有可见性或对流的控制。
因此,对于响应式函数,在错误处理和重试(即 等)方面,您必须依赖响应式 API 的丰富性。 |
Spring Cloud Function 提供的功能组合支持的一大好处是 事实上,您可以组合反应式和命令式函数。 |
请参阅 [绑定和绑定名称] 部分,了解用于建立此类应用程序使用的绑定名称的命名约定。 |
与具有多个输入/输出的函数一样,请参阅 [绑定和绑定名称] 部分了解命名 用于建立此类应用程序使用的约束性名称的约定。 |
请参阅 [绑定和绑定名称] 部分了解命名 用于建立此类应用程序使用的约束性名称的约定。 |
使用轮询使用者
概述
使用轮询使用者时,按需轮询。
若要为轮询使用者定义绑定,需要提供属性。PollableMessageSource
spring.cloud.stream.pollable-source
请考虑以下轮询使用者绑定的示例:
--spring.cloud.stream.pollable-source=myDestination
前面示例中的 pollable-source 名称将导致绑定名称保留
与函数式编程模型一致。myDestination
myDestination-in-0
给定前面示例中轮询的使用者,您可以按如下方式使用它:
@Bean
public ApplicationRunner poller(PollableMessageSource destIn, MessageChannel destOut) {
return args -> {
while (someCondition()) {
try {
if (!destIn.poll(m -> {
String newPayload = ((String) m.getPayload()).toUpperCase();
destOut.send(new GenericMessage<>(newPayload));
})) {
Thread.sleep(1000);
}
}
catch (Exception e) {
// handle failure
}
}
};
}
一个不那么手动、更像 Spring 的替代方案是配置一个计划任务 bean。例如
@Scheduled(fixedDelay = 5_000)
public void poll() {
System.out.println("Polling...");
this.source.poll(m -> {
System.out.println(m.getPayload());
}, new ParameterizedTypeReference<Foo>() { });
}
该方法采用一个参数(通常是 lambda 表达式,如下所示)。
如果消息已收到并成功处理,则返回。PollableMessageSource.poll()
MessageHandler
true
与消息驱动的使用者一样,如果引发异常,则消息将发布到错误通道,
如 中所述。MessageHandler
Error Handling
通常,该方法在退出时确认消息。
如果该方法异常退出,则邮件将被拒绝(不会重新排队),但请参阅处理错误。
您可以通过对确认负责来覆盖该行为,如以下示例所示:poll()
MessageHandler
@Bean
public ApplicationRunner poller(PollableMessageSource dest1In, MessageChannel dest2Out) {
return args -> {
while (someCondition()) {
if (!dest1In.poll(m -> {
StaticMessageHeaderAccessor.getAcknowledgmentCallback(m).noAutoAck();
// e.g. hand off to another thread which can perform the ack
// or acknowledge(Status.REQUEUE)
})) {
Thread.sleep(1000);
}
}
};
}
您必须(或)在某个时候发送消息,以避免资源泄漏。ack nack |
某些消息传递系统(如 Apache Kafka)在日志中维护简单的偏移量。如果传递失败并重新排队,则重新传递任何稍后成功确认的消息。StaticMessageHeaderAccessor.getAcknowledgmentCallback(m).acknowledge(Status.REQUEUE); |
还有一个重载方法,其定义如下:poll
poll(MessageHandler handler, ParameterizedTypeReference<?> type)
这是一个转换提示,允许转换传入的消息有效负载,如以下示例所示:type
boolean result = pollableSource.poll(received -> {
Map<String, Foo> payload = (Map<String, Foo>) received.getPayload();
...
}, new ParameterizedTypeReference<Map<String, Foo>>() {});
处理错误
默认情况下,为可轮询源配置错误通道;如果回调抛出异常,则将 an 发送到错误通道 ();此错误通道也桥接到全局 Spring Integration 。ErrorMessage
<destination>.<group>.errors
errorChannel
您可以订阅任何一个错误频道,以处理错误;如果没有订阅,则只会记录错误,并且消息将被确认为成功。
如果错误通道服务激活器引发异常,则邮件将被拒绝(默认情况下)并且不会重新传递。
如果服务激活器抛出 ,则该消息将在代理处重新排队,并将在随后的轮询中再次检索。@ServiceActivator
RequeueCurrentMessageException
如果侦听器直接抛出消息,则消息将重新排队,如上所述,并且不会发送到错误通道。RequeueCurrentMessageException
您必须(或)在某个时候发送消息,以避免资源泄漏。ack nack |
某些消息传递系统(如 Apache Kafka)在日志中维护简单的偏移量。如果传递失败并重新排队,则重新传递任何稍后成功确认的消息。StaticMessageHeaderAccessor.getAcknowledgmentCallback(m).acknowledge(Status.REQUEUE); |