此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 spring-cloud-stream 4.1.4spring-doc.cn

可观察性

Spring 通过 Micrometer 提供对 Observability 的支持,它定义了一个 Observation 概念,该概念在应用程序中同时启用 Metrics 和 Tracesspring-doc.cn

Spring cloud Stream 通过在几个抽象中提供一个 ,在 Spring Cloud Function 级别集成了这种支持,它包装了函数以开箱即用地处理观察结果。ObservationFunctionAroundWrapperspring-doc.cn

必需的依赖项spring-doc.cn

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
	<groupId>io.projectreactor</groupId>
	<artifactId>reactor-core-micrometer</artifactId>
</dependency>

以及一个可用的示踪桥。例如 Zipkin, Bravespring-doc.cn

<dependency>
	<groupId>io.micrometer</groupId>
	<artifactId>micrometer-tracing-bridge-brave</artifactId>
</dependency>

命令式函数

命令式函数使用 observation 包装器包装,该包装器提供了必要的基础设施来处理与 Observation 注册表的交互。 此类交互在每次调用函数时发生,这实际上意味着观察值附加到 函数(即每条消息一个观察)。 换句话说,对于命令式函数,如果存在前面提到的必需依赖项,则可观测性将正常工作。ObservationFunctionAroundWrapperspring-doc.cn

反应式函数

响应式函数与命令式函数在本质上不同,因此没有用 .ObservationFunctionAroundWrapperspring-doc.cn

命令式函数是一个消息处理程序函数,每次有消息时框架都会调用它,有点像典型的事件处理程序,其中 N 条消息将有 N 次此类函数的调用。这允许我们包装这样的函数,以使用额外的功能来装饰它,例如错误处理重试,当然还有可观察性spring-doc.cn

响应式函数是初始化函数。它的工作是将用户提供的流处理代码 (Flux) 与 Binder 提供的源流和目标流连接起来。在应用程序启动期间,它只调用一次。一旦流代码与源/目标流连接,我们就无法查看或控制实际的流处理。它掌握在响应式 API 手中。Reactive 函数还带来了一个额外的变量。鉴于该函数为您提供了对整个流链(而不仅仅是单个事件)的可见性,那么默认的观察单位应该是什么? 流链中的单个项目?一系列物品?如果一段时间后没有消息怎么办?等。。.我们想要强调的是,对于响应式函数,我们不能假设任何事情。(有关反应式函数和命令式函数之间差异的更多信息,请参阅 反应式函数)。spring-doc.cn

因此,就像重试错误处理一样,您需要手动处理观察。spring-doc.cn

值得庆幸的是,您可以通过使用反应式 API 的操作来访问流的一段,同时提供 .此类 segment 定义了一个观察单位,它可以是 flux 中的单个项目或范围,或者您可能想要在流中观察的任何其他内容。tapObservationRegistryspring-doc.cn

@SpringBootApplication
public class DemoStreamApplication {

	Logger logger = LoggerFactory.getLogger(DemoStreamApplication.class);

	public static void main(String[] args) {
		Hooks.enableAutomaticContextPropagation();
		SpringApplication.run(DemoStreamApplication.class, args);
	}

	@Bean
	public Function<Flux<String>, Flux<String>> uppercase(ObservationRegistry registry) {
		return flux -> flux.flatMap(item -> {
			return Mono.just(item)
                             .map(value -> value.toUpperCase())
                             .doOnNext(v -> logger.info(v))
                             .tap(Micrometer.observation(registry));
		});
	}
}

上面的示例模拟了将 Observation 附加到单个消息处理(即命令式函数)上,因为在这种情况下,观察单元以 Mono.just(..) 开头,最后一个操作将 the 附加到订阅者。ObservationRegistryspring-doc.cn

如果已经有 Observation 附加到订阅者,它将用于为 上游的链/段创建子 Observation,但是正如我们已经说过的,默认情况下,框架不会将任何 Observation 附加到您返回的流链。tapspring-doc.cn