此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 spring-cloud-stream 4.1.4! |
生成和使用消息
您可以通过简单地编写函数并将其公开为 s 来编写 Spring Cloud Stream 应用程序。
您还可以使用基于 Spring Integration 注释的配置或
基于 Spring Cloud Stream 注释的配置,尽管从 spring-cloud-stream 3.x 开始
我们建议使用函数式实现。@Bean
Spring Cloud Function 支持
概述
从 Spring Cloud Stream v2.1 开始,定义流处理程序和源的另一种替代方法是使用内置
支持 Spring Cloud Function,其中它们可以表示为
类型。java.util.function.[Supplier/Function/Consumer]
要指定要绑定到绑定公开的外部目标的功能性 Bean,请执行以下操作:
您必须提供财产。spring.cloud.function.definition
如果您只有一个 bean 类型为 ,则可以
跳过该属性,因为这样的功能性 bean 将被自动发现。然而
使用此类属性以避免任何混淆被认为是最佳实践。
有时这种自动发现可能会妨碍,因为单个 bean 类型的目的可能是处理消息以外的目的,但是它是自动发现和自动绑定的。
对于这些极少数情况,您可以通过提供 value 设置为 的属性来禁用自动发现。java.util.function.[Supplier/Function/Consumer] spring.cloud.function.definition java.util.function.[Supplier/Function/Consumer] spring.cloud.stream.function.autodetect false |
下面是一个示例,应用程序通过充当数据的使用者和生产者来公开消息处理程序,以有效地支持传递语义。java.util.function.Function
@SpringBootApplication
public class MyFunctionBootApp {
public static void main(String[] args) {
SpringApplication.run(MyFunctionBootApp.class);
}
@Bean
public Function<String, String> toUpperCase() {
return s -> s.toUpperCase();
}
}
在前面的示例中,我们定义了一个名为 toUpperCase 类型的 Bean 作为消息处理程序
其 'input' 和 'output' 必须绑定到提供的目标 Binder 公开的外部目标。
默认情况下,“input”和“output”绑定名称将为 和 。
有关用于建立绑定名称的命名约定的详细信息,请参阅 功能绑定名称 部分。java.util.function.Function
toUpperCase-in-0
toUpperCase-out-0
以下是支持其他语义的简单功能应用程序示例:
以下是公开为java.util.function.Supplier
@SpringBootApplication
public static class SourceFromSupplier {
@Bean
public Supplier<Date> date() {
return () -> new Date(12345L);
}
}
以下是公开为java.util.function.Consumer
@SpringBootApplication
public static class SinkFromConsumer {
@Bean
public Consumer<String> sink() {
return System.out::println;
}
}
供应商(来源)
Function
并且在如何触发他们的调用时非常简单。它们是基于触发的
发送到它们绑定到的目标的数据(事件)上。换句话说,它们是经典的事件驱动组件。Consumer
但是,在触发方面,它属于自己的类别。由于根据定义,它是数据的来源(原点),因此它不会
订阅任何入站目标,因此必须由其他一些机制触发。
还有一个实施问题,这可能是强制性的或被动的,这与触发此类供应商直接相关。Supplier
Supplier
请考虑以下示例:
@SpringBootApplication
public static class SupplierConfiguration {
@Bean
public Supplier<String> stringSupplier() {
return () -> "Hello from Supplier";
}
}
每当调用前面的 Bean 方法时,都会生成一个字符串。但是,谁调用此方法以及调用频率如何?
该框架提供了一个默认的轮询机制(回答 “Who?” 的问题),该机制将触发对供应商的调用,默认情况下,它将这样做
每秒一次(回答 “How frequency?” 的问题)。
换句话说,上述配置每秒生成一条消息,每条消息都发送到 Binder 公开的目标。
要了解如何自定义轮询机制,请参阅轮询配置属性部分。Supplier
get()
output
考虑一个不同的示例:
@SpringBootApplication
public static class SupplierConfiguration {
@Bean
public Supplier<Flux<String>> stringSupplier() {
return () -> Flux.fromStream(Stream.generate(new Supplier<String>() {
@Override
public String get() {
try {
Thread.sleep(1000);
return "Hello from Supplier";
} catch (Exception e) {
// ignore
}
}
})).subscribeOn(Schedulers.elastic()).share();
}
}
前面的 bean 采用反应式编程风格。通常,与命令式供应商不同,
它应该只触发一次,因为调用其方法会产生(提供)连续的消息流,而不是
单个消息。Supplier
get()
框架识别编程风格的差异,并保证这样的供应商只触发一次。
但是,想象一下您想要轮询某个数据源并返回表示结果集的有限数据流的使用案例。 响应式编程风格是这种 Provider 的完美机制。但是,考虑到生成的流的有限性, 此类 Supplier 仍需要定期调用。
请考虑以下示例,该示例通过生成有限的数据流来模拟此类用例:
@SpringBootApplication
public static class SupplierConfiguration {
@PollableBean
public Supplier<Flux<String>> stringSupplier() {
return () -> Flux.just("hello", "bye");
}
}
bean 本身使用 annotation (sub-set of ) 进行注释,从而向框架发出信号,尽管实现
这样的供应商是被动的,它仍然需要被轮询。PollableBean
@Bean
定义了一个属性,其中向此 Comments 的后处理器发出信号
必须拆分 annotated 组件生成的结果,并将其设置为 Default 。这意味着
框架将拆分 return 并将每个项目作为单独的消息发送出去。如果这不是
他想要的行为,你可以把它设置为这样的供应商会简单地返回
产生的 Flux,而不分裂它。splittable PollableBean true false |
供应商 & 线程
正如您现在所了解的,与由事件触发的 和 不同(它们具有输入数据),它没有
any input,因此由不同的机制 - Poller 触发,它可能具有不可预测的线程机制。虽然
线程机制大多数时候与函数的下游执行无关,在某些情况下它可能会出现问题
尤其是对于可能对线程关联有特定期望的集成框架。例如,依赖于
在跟踪存储在线程本地的数据时。
对于这些情况,我们通过 提供了另一种机制,其中用户对线程机制有更多的控制权。您可以获取更多详细信息
在 将任意数据发送到输出(例如,外部事件驱动源) 部分中。Function Consumer Supplier StreamBridge |
消费者 (Reactive)
Reactive 有点特殊,因为它有一个 void 返回类型,让框架没有可以订阅的引用。
很可能您不需要编写 ,而是将其作为调用运算符写入流中的最后一个运算符。Consumer
Consumer<Flux<?>>
Function<Flux<?>, Mono<Void>>
then
例如:
public Function<Flux<?>, Mono<Void>> consumer() {
return flux -> flux.map(..).filter(..).then();
}
但是,如果您确实需要编写 explicit ,请记住订阅传入的 Flux。Consumer<Flux<?>>
另外,请记住,在混合反应式函数和命令式函数时,相同的规则适用于函数组合。
Spring Cloud Function 确实支持使用 Imperative 组合反应式函数,但是您必须了解某些限制。
例如,假设您已经使用 imperative consumer 编写了 reactive function。
这种组合的结果是反应式 .但是,没有办法订阅本节前面讨论的此类 Consumer,
因此,只有通过使你的消费者响应并手动订阅(如前所述),或者将你的函数更改为命令式,才能解决这个限制。Consumer
轮询配置属性
Spring Cloud Stream 公开以下属性,并以 :spring.integration.poller.
- 固定延迟
-
修复了默认 poller 的延迟(以毫秒为单位)。
默认值:1000L。
- maxMessagesPerPoll 的
-
默认 Poller 的每个轮询事件的最大消息数。
默认值:1L。
- cron (定时)
-
Cron Trigger 的 Cron 表达式值。
默认值:none。
- initialDelay (初始延迟)
-
定期触发器的初始延迟。
默认值:0。
- timeUnit (时间单位)
-
要应用于延迟值的 TimeUnit。
默认值:MILLISECONDS。
例如,将 poller 间隔设置为每两秒轮询一次。--spring.integration.poller.fixed-delay=2000
每个绑定的轮询配置
上一节展示了如何配置将应用于所有绑定的单个默认 Poller。虽然它非常适合微服务 spring-cloud-stream 的模型,该模型旨在每个微服务代表一个组件(例如,Supplier),因此默认的 Poller 配置就足够了,但在某些边缘情况下,即 您可能有多个组件需要不同的轮询配置
对于这种情况,请使用 per-binding 方式配置 poller。例如,假设您有一个 output binding 。在这种情况下,你可以为这样的
使用 prefix 进行绑定(例如,)。supply-out-0
spring.cloud.stream.bindings.supply-out-0.producer.poller..
spring.cloud.stream.bindings.supply-out-0.producer.poller.fixed-delay=2000
将任意数据发送到输出(例如,外部事件驱动的源)
在某些情况下,数据的实际来源可能来自不是 Binder 的外部(外部)系统。例如, 数据源可以是经典 REST 端点。我们如何将这样的源代码与 spring-cloud-stream 使用的功能机制联系起来?
Spring Cloud Stream 提供了两种机制,让我们更详细地了解一下它们
在这里,对于这两个示例,我们将使用一个名为 bound to the root web context(绑定到根 Web 上下文)的标准 MVC 端点方法。
通过 StreamBridge 机制将传入请求委托给流式处理。delegateToSupplier
@SpringBootApplication
@Controller
public class WebSourceApplication {
public static void main(String[] args) {
SpringApplication.run(WebSourceApplication.class, "--spring.cloud.stream.output-bindings=toStream");
}
@Autowired
private StreamBridge streamBridge;
@RequestMapping
@ResponseStatus(HttpStatus.ACCEPTED)
public void delegateToSupplier(@RequestBody String body) {
System.out.println("Sending " + body);
streamBridge.send("toStream", body);
}
}
在这里,我们自动装配了一个 bean,它允许我们有效地将数据发送到输出绑定
将非流应用程序与 spring-cloud-stream 桥接。请注意,前面的示例没有任何
源函数定义(例如,Supplier bean),使框架没有触发器来提前创建源绑定,这在配置包含函数 bean 的情况下是典型的。这很好,因为将启动输出绑定的创建(以及
destination auto-provisioning(如有必要))对于不存在的绑定,在第一次调用其操作时将其缓存为
后续重用(有关更多详细信息,请参阅 StreamBridge 和动态目标)。StreamBridge
StreamBridge
send(..)
但是,如果要在初始化(启动)时预先创建输出绑定,则可以从可以声明源名称的属性中受益。
提供的名称将用作创建源绑定的触发器。
可用于表示多个源(多个输出绑定)
(例如,spring.cloud.stream.output-bindings
;
--spring.cloud.stream.output-bindings=foo;bar
)
另外,请注意,该方法采用 for data。这意味着你可以发送 POJO 或 to it,然后它
在发送输出时,将经历相同的例程,就好像它来自提供相同级别的任何 Function 或 Supplier 一样
的一致性与功能一样。这意味着输出类型转换、分区等都被视为来自函数生成的输出。streamBridge.send(..)
Object
Message
使用异步发送的 StreamBridge
StreamBridge
使用 Spring Integration Framework 提供的发送机制,该框架是 Spring Cloud Stream 的核心。默认情况下,此机制使用发送方的线程。换句话说,发送是阻塞的。虽然这在许多情况下是可以的,但在某些情况下,您希望此类发送是异步的。为此,请在调用其中一个 send 方法之前使用 the 的方法。setAsync(true)
StreamBridge
可观察性 使用异步发送的上下文传播
当使用框架提供的 Observability 支持以及支持 Spring 框架时,打破线程边界将影响 Observability 上下文的一致性,从而影响您的跟踪历史记录。为避免这种情况,您只需添加 Micrometer 的依赖项(见下文)context-propagation
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>context-propagation</artifactId>
<version>1.1.0</version>
</dependency>
StreamBridge 和动态目标
StreamBridge
也可用于与用例类似的输出目的地事先未知的情况
在 路由 FROM 使用者 部分。
让我们看一下示例
@SpringBootApplication
@Controller
public class WebSourceApplication {
public static void main(String[] args) {
SpringApplication.run(WebSourceApplication.class, args);
}
@Autowired
private StreamBridge streamBridge;
@RequestMapping
@ResponseStatus(HttpStatus.ACCEPTED)
public void delegateToSupplier(@RequestBody String body) {
System.out.println("Sending " + body);
streamBridge.send("myDestination", body);
}
}
如你所见,前面的示例与前面的示例非常相似,除了通过 property 提供的显式绑定指令(未提供)。
在这里,我们将数据发送到 name ,它不作为绑定存在。因此,此类名称将被视为动态目标
如 路由 FROM 使用者部分所述。spring.cloud.stream.output-bindings
myDestination
在前面的示例中,我们使用 as a foreign source 来馈送流。ApplicationRunner
一个更实际的示例,其中外部源是 REST 端点。
@SpringBootApplication
@Controller
public class WebSourceApplication {
public static void main(String[] args) {
SpringApplication.run(WebSourceApplication.class);
}
@Autowired
private StreamBridge streamBridge;
@RequestMapping
@ResponseStatus(HttpStatus.ACCEPTED)
public void delegateToSupplier(@RequestBody String body) {
streamBridge.send("myBinding", body);
}
}
正如您在方法内部看到的,我们使用 StreamBridge 将数据发送到 binding。在这里,您还将受益于
where if 不存在的动态功能将自动创建并缓存,否则将使用现有绑定。delegateToSupplier
myBinding
StreamBridge
myBinding
如果有许多动态目标,则缓存动态目标 (绑定) 可能会导致内存泄漏。具有一定程度的控制
我们为默认缓存大小为 10 的输出绑定提供了一种自驱逐缓存机制。这意味着,如果您的动态目标大小超过该数字,则现有绑定可能会被逐出,因此需要重新创建,这可能会导致轻微的性能下降。您可以通过将缓存设置为所需值来增加缓存大小。spring.cloud.stream.dynamic-destination-cache-size |
curl -H "Content-Type: text/plain" -X POST -d "hello from the other side" http://localhost:8080/
通过展示两个例子,我们想强调这种方法适用于任何类型的外国来源。
如果您使用的是 Solace PubSub+ Binders,则 Spring Cloud Stream 已保留 Headers(可通过 BinderHeaders.TARGET_DESTINATION 检索),这允许将消息从其绑定的配置目标重定向到此 Headers 指定的目标目标。这允许 Binders 管理发布到动态目标所需的资源,使框架不必这样做,并避免了上一个注释中提到的缓存问题。更多信息在这里。scst_targetDestination |
使用 StreamBridge 的输出内容类型
如有必要,您还可以使用以下方法签名提供特定内容类型 。
或者,如果您将数据作为 发送,则其内容类型将得到支持。public boolean send(String bindingName, Object data, MimeType outputContentType)
Message
在 StreamBridge 中使用特定的 Binder 类型
Spring Cloud Stream 支持多种 Binder 场景。例如,您可能从 Kafka 接收数据并将其发送到 RabbitMQ。
有关多个 Binders 场景的更多信息,请参阅 Binders 部分,特别是 Classpath 上的多个 Binders
如果您计划使用 StreamBridge 并在应用程序中配置了多个 Binder,则还必须告诉 StreamBridge
使用哪个 Binder。为此,还有两种方法:send
public boolean send(String bindingName, @Nullable String binderType, Object data)
public boolean send(String bindingName, @Nullable String binderType, Object data, MimeType outputContentType)
如您所见,您可以提供一个额外的参数 - ,告诉 BindingService 在创建动态绑定时要使用哪个 Binder。binderType
对于使用属性或已在不同的 Binder 下创建绑定的情况,该参数将不起作用。spring.cloud.stream.output-bindings binderType |
将通道拦截器与 StreamBridge 结合使用
由于使用 a 建立输出绑定,因此可以在通过 发送数据时激活通道拦截器。
由应用程序决定在哪些通道拦截器上应用。
Spring Cloud Stream 不会注入检测到的所有通道拦截器,除非它们被 .StreamBridge
MessageChannel
StreamBridge
StreamBridge
StreamBridge
@GlobalChannelInterceptor(patterns = "*")
假设您在应用程序中有以下两个不同的绑定。StreamBridge
streamBridge.send("foo-out-0", message);
和
streamBridge.send("bar-out-0", message);
现在,如果你想在两个绑定上都应用一个通道拦截器,那么你可以声明以下 bean。StreamBridge
GlobalChannelInterceptor
@Bean
@GlobalChannelInterceptor(patterns = "*")
public ChannelInterceptor customInterceptor() {
return new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
...
}
};
}
但是,如果您不喜欢上述全局方法,并且希望为每个绑定提供专用的侦听器,则可以执行以下操作。
@Bean
@GlobalChannelInterceptor(patterns = "foo-*")
public ChannelInterceptor fooInterceptor() {
return new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
...
}
};
}
和
@Bean
@GlobalChannelInterceptor(patterns = "bar-*")
public ChannelInterceptor barInterceptor() {
return new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
...
}
};
}
您可以灵活地使模式更加严格或根据您的业务需求进行自定义。
使用这种方法,应用程序能够决定注入哪些拦截器,而不是应用所有可用的拦截器。StreamBridge
StreamBridge 通过包含所有方法的接口提供协定。因此,应用程序可以选择使用 自动装配。当涉及到通过为接口提供 mock 或类似机制来使用单元测试代码时,这非常方便。StreamOperations send StreamBridge StreamOperations StreamBridge StreamOperations |
响应式函数支持
由于 Spring Cloud Function 构建在 Project Reactor 之上,因此您无需执行太多操作
在实现 时从反应式编程模型中受益,或 。Supplier
Function
Consumer
例如:
@SpringBootApplication
public static class SinkFromConsumer {
@Bean
public Function<Flux<String>, Flux<String>> reactiveUpperCase() {
return flux -> flux.map(val -> val.toUpperCase());
}
}
在选择反应式或命令式编程模型时,必须了解一些重要的事情。 完全反应式还是只是 API? 使用反应式 API 并不一定意味着您可以从此类 API 的所有反应式功能中受益。换句话说,back-pressure 和其他高级功能只有在与兼容系统(例如 Reactive Kafka Binder)一起使用时才能工作。如果您使用的是常规的 Kafka 或 Rabbit 或任何其他非反应式 Binder,您只能从反应式 API 本身的便利性中受益,而不能从其高级功能中受益,因为流的实际源或目标不是反应式的。 错误处理和重试 在本手册中,您将看到有关基于框架的错误处理、重试和其他功能以及与它们相关的配置属性的多个参考。重要的是要明白它们只影响命令式函数,当涉及到响应式函数时,你不应该有相同的期望。原因如下......
响应式函数和命令式函数之间有一个根本的区别。
命令式函数是一个消息处理程序,框架在接收到的每条消息上都会调用它。因此,对于 N 条消息,将有 N 次此类函数的调用,因此我们可以包装此类函数并添加其他功能,例如错误处理、重试等。
响应式函数是初始化函数。它只调用一次,以获取对用户提供的 Flux/Mono 的引用,以便与框架提供的 Flux/Mono 连接。在那之后,我们(框架)完全无法查看或控制流。
因此,对于反应式函数,在错误处理和重试(即 , 等)方面,您必须依赖反应式 API 的丰富性。 |
功能组成
使用函数式编程模型,您还可以从函数组合中受益,您可以在其中从一组简单函数动态组合复杂的处理程序。 例如,让我们将以下函数 Bean 添加到上面定义的应用程序中
@Bean
public Function<String, String> wrapInQuotes() {
return s -> "\"" + s + "\"";
}
并修改属性以反映您打算从 'toUpperCase' 和 'wrapInQuotes' 组合新函数的意图。
为此,Spring Cloud Function 依赖于 (管道) 符号。因此,为了完成我们的示例,我们的属性现在将如下所示:spring.cloud.function.definition
|
--spring.cloud.function.definition=toUpperCase|wrapInQuotes
Spring Cloud Function 提供的函数组合支持的一大好处是 事实上,你可以编写反应式和命令式函数。 |
组合的结果是一个函数,正如您可能猜到的那样,它可能有一个很长且相当神秘的名称(例如,)
当涉及到其他配置属性时,会带来很多不便。这就是 功能绑定名称 部分中描述的描述性绑定名称功能可以提供帮助的地方。foo|bar|baz|xyz. . .
例如,如果我们想给我们一个更具描述性的名字,我们可以这样做
具有以下属性允许
其他配置属性来引用该绑定名称(例如 )。toUpperCase|wrapInQuotes
spring.cloud.stream.function.bindings.toUpperCase|wrapInQuotes-in-0=quotedUpperCaseInput
spring.cloud.stream.bindings.quotedUpperCaseInput.destination=myDestination
功能构成和交叉关注点
函数组合允许您通过分解复杂性来有效地解决复杂性问题 转换为一组简单且可单独管理/可测试的组件,这些组件仍然可以 在运行时表示为 1。但这并不是唯一的好处。
你也可以使用组合来解决某些横切非功能性问题。 例如内容丰富。例如,假设您有一封传入邮件,该邮件可能会 缺少某些标题,或者某些标题与您的企业不完全处于状态 function 会期望。您现在可以实现一个单独的函数来解决这些问题 关注点,然后使用 main business function 进行组合。
让我们看一下示例
@SpringBootApplication
public class DemoStreamApplication {
public static void main(String[] args) {
SpringApplication.run(DemoStreamApplication.class,
"--spring.cloud.function.definition=enrich|echo",
"--spring.cloud.stream.function.bindings.enrich|echo-in-0=input",
"--spring.cloud.stream.bindings.input.destination=myDestination",
"--spring.cloud.stream.bindings.input.group=myGroup");
}
@Bean
public Function<Message<String>, Message<String>> enrich() {
return message -> {
Assert.isTrue(!message.getHeaders().containsKey("foo"), "Should NOT contain 'foo' header");
return MessageBuilder.fromMessage(message).setHeader("foo", "bar").build();
};
}
@Bean
public Function<Message<String>, Message<String>> echo() {
return message -> {
Assert.isTrue(message.getHeaders().containsKey("foo"), "Should contain 'foo' header");
System.out.println("Incoming message " + message);
return message;
};
}
}
虽然微不足道,但此示例演示了一个函数如何使用额外的标头(非功能性问题)来丰富传入的 Message,
所以另一个函数 - - 可以从中受益。该函数保持干净,仅专注于业务逻辑。
您还可以查看 property 的用法,以简化组合的绑定名称。echo
echo
spring.cloud.stream.function.bindings
具有多个输入和输出参数的函数
从版本 3.0 开始, spring-cloud-stream 支持以下函数: 具有多个输入和/或多个输出(返回值)。这实际上意味着什么,以及 它针对什么类型的用例?
-
大数据:想象一下您正在处理的数据源是高度无组织的,并且包含各种类型的数据元素 (例如,订单、交易等),并且您实际上需要对其进行整理。
-
数据聚合:另一个用例可能要求您合并来自 2+ 个传入_streams的数据元素。
以上仅描述了一些可能需要使用单个函数来接受和/或生成的用例 多个数据流。这就是我们在这里针对的使用案例类型。
另外,请注意此处对 streams 概念的强调略有不同。假设此类函数只有价值
是否允许他们访问实际的数据流(而不是单个元素)。因此,我们依赖于
Project Reactor 提供的抽象(即 和 ),该抽象已在
classpath 作为 spring-cloud-functions 引入的依赖项的一部分。Flux
Mono
另一个重要方面是多个输入和输出的表示。虽然 java 提供了
各种不同的抽象来表示这些抽象的多个事物
是 a) 无界的,b) 缺少 arity 和 c) 缺少类型信息,这在这个上下文中都很重要。
举个例子,让我们看看一个数组,它只允许我们
描述单个类型的多个或向上转换所有内容到 ,影响 的透明类型转换功能
spring-cloud-stream 等。Collection
Object
因此,为了满足所有这些要求,最初的支持依赖于签名,它利用了另一种抽象 由 Project Reactor - Tuples 提供。但是,我们正在努力允许更灵活的签名。
请参阅 绑定 和 绑定名称 部分,了解用于建立此类应用程序使用的绑定名称的命名约定。 |
让我们看一下这几个示例:
@SpringBootApplication
public class SampleApplication {
@Bean
public Function<Tuple2<Flux<String>, Flux<Integer>>, Flux<String>> gather() {
return tuple -> {
Flux<String> stringStream = tuple.getT1();
Flux<String> intStream = tuple.getT2().map(i -> String.valueOf(i));
return Flux.merge(stringStream, intStream);
};
}
}
上面的示例演示了采用两个输入(第一个类型为类型和第二个类型)的函数
并生成 类型的单个输出。String
Integer
String
因此,对于上面的示例,两个输入绑定将是 and,为了保持一致性,
output binding 也遵循相同的约定,并命名为 。gather-in-0
gather-in-1
gather-out-0
知道这一点后,您可以设置绑定特定属性。
例如,以下内容将覆盖 content-type 进行绑定:gather-in-0
--spring.cloud.stream.bindings.gather-in-0.content-type=text/plain
@SpringBootApplication
public class SampleApplication {
@Bean
public static Function<Flux<Integer>, Tuple2<Flux<String>, Flux<String>>> scatter() {
return flux -> {
Flux<Integer> connectedFlux = flux.publish().autoConnect(2);
UnicastProcessor even = UnicastProcessor.create();
UnicastProcessor odd = UnicastProcessor.create();
Flux<Integer> evenFlux = connectedFlux.filter(number -> number % 2 == 0).doOnNext(number -> even.onNext("EVEN: " + number));
Flux<Integer> oddFlux = connectedFlux.filter(number -> number % 2 != 0).doOnNext(number -> odd.onNext("ODD: " + number));
return Tuples.of(Flux.from(even).doOnSubscribe(x -> evenFlux.subscribe()), Flux.from(odd).doOnSubscribe(x -> oddFlux.subscribe()));
};
}
}
上面的示例与前面的示例有点相反,并演示了函数
采用 type 的单个 input 并生成两个 outputs(均为 type )。Integer
String
因此,对于上面的示例,输入绑定是 和
输出绑定为 和 。scatter-in-0
scatter-out-0
scatter-out-1
您可以使用以下代码对其进行测试:
@Test
public void testSingleInputMultiOutput() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration.getCompleteConfiguration(
SampleApplication.class))
.run("--spring.cloud.function.definition=scatter")) {
InputDestination inputDestination = context.getBean(InputDestination.class);
OutputDestination outputDestination = context.getBean(OutputDestination.class);
for (int i = 0; i < 10; i++) {
inputDestination.send(MessageBuilder.withPayload(String.valueOf(i).getBytes()).build());
}
int counter = 0;
for (int i = 0; i < 5; i++) {
Message<byte[]> even = outputDestination.receive(0, 0);
assertThat(even.getPayload()).isEqualTo(("EVEN: " + String.valueOf(counter++)).getBytes());
Message<byte[]> odd = outputDestination.receive(0, 1);
assertThat(odd.getPayload()).isEqualTo(("ODD: " + String.valueOf(counter++)).getBytes());
}
}
}
单个应用程序中的多种功能
可能还需要在单个应用程序中对多个消息处理程序进行分组。您可以通过以下方式做到这一点 定义多个函数。
@SpringBootApplication
public class SampleApplication {
@Bean
public Function<String, String> uppercase() {
return value -> value.toUpperCase();
}
@Bean
public Function<String, String> reverse() {
return value -> new StringBuilder(value).reverse().toString();
}
}
在上面的示例中,我们有定义两个函数和 .
所以首先,如前所述,我们需要注意存在冲突(不止一个函数),因此
我们需要通过提供指向实际函数的属性来解决它
我们想要绑定。除了这里,我们将使用 delimiter 来指向这两个函数(请参阅下面的测试用例)。uppercase
reverse
spring.cloud.function.definition
;
与具有多个 inputs/output 的函数一样,请参考 [Binding 和 Binding names] 部分了解命名 约定,用于建立此类应用程序使用的绑定名称。 |
您可以使用以下代码对其进行测试:
@Test
public void testMultipleFunctions() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration.getCompleteConfiguration(
ReactiveFunctionConfiguration.class))
.run("--spring.cloud.function.definition=uppercase;reverse")) {
InputDestination inputDestination = context.getBean(InputDestination.class);
OutputDestination outputDestination = context.getBean(OutputDestination.class);
Message<byte[]> inputMessage = MessageBuilder.withPayload("Hello".getBytes()).build();
inputDestination.send(inputMessage, "uppercase-in-0");
inputDestination.send(inputMessage, "reverse-in-0");
Message<byte[]> outputMessage = outputDestination.receive(0, "uppercase-out-0");
assertThat(outputMessage.getPayload()).isEqualTo("HELLO".getBytes());
outputMessage = outputDestination.receive(0, "reverse-out-0");
assertThat(outputMessage.getPayload()).isEqualTo("olleH".getBytes());
}
}
批量使用者
当使用支持批量监听器的 ,并且为消费者绑定启用了该功能时,您可以设置为 以启用
要传递给 .MessageChannelBinder
spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode
true
List
@Bean
public Function<List<Person>, Person> findFirstPerson() {
return persons -> persons.get(0);
}
批量类型转换
与单个消息使用者的类型转换类似,批处理要求将批处理中的每条消息转换为请求的类型。例如,在前面的示例中,此类型为 。Person
此外,了解批处理中每条消息的标头在代表整个批处理的 Message 中单独提供也很重要。这些 Messages 及其相应的批处理标头由相应的 Binders 创建,其结构可能有所不同。因此,您应该参考 Binder 文档来了解批处理标头的结构。对于 Kafka 和 Rabbit,您可以分别搜索 和 。MessageHeaders
amqp_batchedHeaders
kafka_batchConvertedHeaders
简而言之,如果您有一条消息表示具有 5 个有效负载的批次,则同一消息将包含一组标头,其中每个标头对应于具有相同索引的有效负载。
但是,如果特定负载无法转换,会发生什么情况?在单个消息场景中,我们只需返回 null 并使用未转换的消息调用您的方法,这会导致异常或允许您处理原始消息,具体取决于您的函数签名。
在批处理的情况下,事情要复杂一些。为未转换的有效负载返回 null 可有效减小批处理大小。例如,如果原始批次包含 5 条消息,而 2 条消息转换失败,则转换后的批次将仅包含 3 条消息。这可能是可以接受的,但是相应的批处理标头呢?仍然会有 5 个标头,因为它们是在 Binder 形成初始批次时创建的。这种差异使得很难将标头与其相应的有效负载相关联。
为了解决这个问题,我们提供了 MessageConverterHelper 接口。
public interface MessageConverterHelper {
/**
* This method will be called by the framework in cases when a message failed to convert.
* It allows you to signal to the framework if such failure should be considered fatal or not.
*
* @param message failed message
* @return true if conversion failure must be considered fatal.
*/
default boolean shouldFailIfCantConvert(Message<?> message) {
return false;
}
/**
* This method will be called by the framework in cases when a single message within batch of messages failed to convert.
* It provides a place for providing post-processing logic before message converter returns.
*
* @param message failed message.
* @param index index of failed message within the batch
*/
default void postProcessBatchMessageOnFailure(Message<?> message, int index) {
}
}
如果实现,则框架的消息转换器逻辑将调用此接口,以便在无法转换特定有效负载时对批处理消息执行后处理。
Kafka 和 Rabbit 的默认实现会自动删除相应的批处理标头,以保持批处理有效负载与其标头之间的关联。但是,如果需要为此类情况添加自定义行为,则可以提供自己的实现并将其注册为 Bean。
此外,该接口还提供了一种方法,允许对转换失败进行更确定性的处理。默认情况下,此方法返回 ,但如果您希望在发生转换错误时使整个过程失败,则可以自定义实施。false
批量生产者
您还可以在生产者端使用批处理的概念,方法是返回一个 Messages 集合,这有效地提供了一个 inverse effect 中,集合中的每条消息将由 Binder 单独发送。
请考虑以下函数:
@Bean
public Function<String, List<Message<String>>> batch() {
return p -> {
List<Message<String>> list = new ArrayList<>();
list.add(MessageBuilder.withPayload(p + ":1").build());
list.add(MessageBuilder.withPayload(p + ":2").build());
list.add(MessageBuilder.withPayload(p + ":3").build());
list.add(MessageBuilder.withPayload(p + ":4").build());
return list;
};
}
返回列表中的每条消息将单独发送,从而将四条消息发送到输出目标。
Spring 集成流作为函数
在实现函数时,您可能有适合该类别的复杂要求 企业集成模式 (EIP) 的最好使用 框架,例如 Spring Integration (SI),它是 EIP 的参考实现。
值得庆幸的是,SI 已经支持通过作为网关的集成流将集成流公开为函数。请考虑以下示例:
@SpringBootApplication
public class FunctionSampleSpringIntegrationApplication {
public static void main(String[] args) {
SpringApplication.run(FunctionSampleSpringIntegrationApplication.class, args);
}
@Bean
public IntegrationFlow uppercaseFlow() {
return IntegrationFlow.from(MessageFunction.class, spec -> spec.beanName("uppercase"))
.<String, String>transform(String::toUpperCase)
.log(LoggingHandler.Level.WARN)
.bridge()
.get();
}
public interface MessageFunction extends Function<Message<String>, Message<String>> {
}
}
对于那些熟悉 SI 的人,你可以看到我们定义了一个 bean 类型,其中我们
声明一个集成流,我们希望将其公开为(使用 SI DSL)称为 .
该接口允许我们显式声明输入和输出的类型,以进行正确的类型转换。
有关类型转换的更多信息,请参阅 [内容类型协商] 部分。IntegrationFlow
Function<String, String>
uppercase
MessageFunction
要接收原始输入,您可以使用 。from(Function.class, …)
生成的函数将绑定到目标 Binder 程序公开的输入和输出目标。
请参考 [Binding 和 Binding names] 部分了解命名 约定,用于建立此类应用程序使用的绑定名称。 |
有关 Spring Integration 和 Spring Cloud Stream 互操作性的更多详细信息,特别是围绕函数式编程模型 你可能会觉得这篇文章非常有趣,因为它深入探讨了 通过合并 Spring Integration 和 Spring Cloud Stream/Functions 的精华,你可以应用各种模式。
使用轮询的使用者
概述
使用轮询的使用者时,您可以按需轮询。
要为轮询的使用者定义绑定,您需要提供 property。PollableMessageSource
spring.cloud.stream.pollable-source
请考虑以下轮询使用者绑定的示例:
--spring.cloud.stream.pollable-source=myDestination
前面示例中的 pollable-source name 将导致绑定 name 保留
与函数式编程模型一致。myDestination
myDestination-in-0
给定前面示例中的轮询使用者,您可以按如下方式使用它:
@Bean
public ApplicationRunner poller(PollableMessageSource destIn, MessageChannel destOut) {
return args -> {
while (someCondition()) {
try {
if (!destIn.poll(m -> {
String newPayload = ((String) m.getPayload()).toUpperCase();
destOut.send(new GenericMessage<>(newPayload));
})) {
Thread.sleep(1000);
}
}
catch (Exception e) {
// handle failure
}
}
};
}
一种手动操作较少且更像 Spring 的替代方法是配置计划任务 bean。例如
@Scheduled(fixedDelay = 5_000)
public void poll() {
System.out.println("Polling...");
this.source.poll(m -> {
System.out.println(m.getPayload());
}, new ParameterizedTypeReference<Foo>() { });
}
该方法采用一个参数(通常是 lambda 表达式,如下所示)。
如果收到并成功处理了消息,则返回消息。PollableMessageSource.poll()
MessageHandler
true
与消息驱动的使用者一样,如果抛出异常,则消息将发布到错误通道。
如 中所述。MessageHandler
Error Handling
通常,该方法会在 exit 时确认消息。
如果方法异常退出,则消息将被拒绝(而不是重新排队),但请参阅 处理错误。
您可以通过负责确认来覆盖该行为,如以下示例所示:poll()
MessageHandler
@Bean
public ApplicationRunner poller(PollableMessageSource dest1In, MessageChannel dest2Out) {
return args -> {
while (someCondition()) {
if (!dest1In.poll(m -> {
StaticMessageHeaderAccessor.getAcknowledgmentCallback(m).noAutoAck();
// e.g. hand off to another thread which can perform the ack
// or acknowledge(Status.REQUEUE)
})) {
Thread.sleep(1000);
}
}
};
}
您必须在某个时候(或)消息,以避免资源泄漏。ack nack |
某些消息传递系统(例如 Apache Kafka)在日志中维护一个简单的偏移量。如果投放失败并重新排队,则稍后成功确认的消息将重新投放。StaticMessageHeaderAccessor.getAcknowledgmentCallback(m).acknowledge(Status.REQUEUE); |
还有一个重载方法,其定义如下:poll
poll(MessageHandler handler, ParameterizedTypeReference<?> type)
这是一个转换提示,允许转换传入消息有效负载,如以下示例所示:type
boolean result = pollableSource.poll(received -> {
Map<String, Foo> payload = (Map<String, Foo>) received.getPayload();
...
}, new ParameterizedTypeReference<Map<String, Foo>>() {});
处理错误
默认情况下,为 pollable 源配置了错误通道;如果回调抛出异常,则向 Error 通道发送 an ();此 error 通道也桥接到全局 Spring Integration 。ErrorMessage
<destination>.<group>.errors
errorChannel
您可以使用 订阅任一错误通道来处理错误;如果没有订阅,则只会记录错误,并将消息确认为成功。
如果错误通道服务激活器引发异常,则消息将被拒绝(默认情况下)并且不会重新投递。
如果服务激活器抛出 ,则消息将在代理处重新排队,并将在后续轮询中再次检索。@ServiceActivator
RequeueCurrentMessageException
如果侦听器直接抛出 a,则如上所述,消息将被重新排队,并且不会发送到错误通道。RequeueCurrentMessageException