Spring 的数据集成之旅始于 Spring Integration。借助其编程模型,它提供了一致的开发人员体验,以构建可以采用企业集成模式的应用程序,以连接外部系统,例如数据库、消息代理等。

快进到云时代,微服务在企业环境中变得突出。Spring Boot 改变了开发人员构建应用程序的方式。借助 Spring 的编程模型和 Spring Boot 处理的运行时责任,开发基于 Spring 的独立生产级微服务变得无缝。

为了将其扩展到 Data Integration 工作负载,Spring Integration 和 Spring Boot 被整合到一个新项目中。Spring Cloud Stream 诞生了。

借助 Spring Cloud Stream,开发人员可以:

  • 隔离构建、测试和部署以数据为中心的应用程序。

  • 应用现代微服务架构模式,包括通过消息传递进行组合。

  • 通过以事件为中心的思维将应用程序职责解耦。事件可以表示及时发生的事情,下游使用者应用程序可以在不知道其来源或生产者身份的情况下做出反应。

  • 将业务逻辑移植到消息代理(例如 RabbitMQ、Apache Kafka、Amazon Kinesis)上。

  • 依靠框架对常见用例的自动内容类型支持。可以扩展到不同的数据转换类型。

  • 以及更多......

快速开始

您可以按照此三步指南在不到 5 分钟的时间内试用 Spring Cloud Stream,甚至在进入任何细节之前。

我们将向您展示如何创建一个 Spring Cloud Stream 应用程序,该应用程序接收来自您选择的消息传递中间件的消息(稍后将详细介绍),并将收到的消息记录到控制台。 我们称之为 。 虽然不是很实用,但它很好地介绍了一些主要概念 和 abstractions,以便更轻松地消化本用户指南的其余部分。LoggingConsumer

这三个步骤如下:

使用 Spring initializr 创建示例应用程序

要开始使用,请访问 Spring Initializr。从那里,您可以生成我们的应用程序。为此,请执行以下操作:LoggingConsumer

  1. Dependencies (依赖项) 部分中,开始键入 。 当“Cloud Stream”选项出现时,选择它。stream

  2. 开始键入 'kafka' 或 'rabbit'。

  3. 选择 “Kafka” 或 “RabbitMQ”。

    基本上,您可以选择应用程序绑定到的消息中间件。 我们建议使用您已经安装的那个,或者觉得安装和运行更舒服。 此外,正如您从 Initilaizer 屏幕中看到的那样,您还可以选择一些其他选项。 例如,您可以选择 Gradle 作为构建工具,而不是 Maven(默认)。

  4. Artifact 字段中,键入 'logging-consumer'。

    Artifact 字段的值将成为应用程序名称。 如果您选择了 RabbitMQ 作为中间件,则您的 Spring Initializr 现在应如下所示:

Spring initializr
  1. 单击 Generate Project 按钮。

    这样做会将生成的工程的压缩版本下载到您的硬盘驱动器。

  2. 将文件解压缩到要用作项目目录的文件夹中。

我们鼓励您探索 Spring Initializr 中提供的许多可能性。 它允许您创建许多不同类型的 Spring 应用程序。
我们鼓励您探索 Spring Initializr 中提供的许多可能性。 它允许您创建许多不同类型的 Spring 应用程序。

将项目导入 IDE

现在,您可以将项目导入到 IDE 中。 请记住,根据 IDE,您可能需要遵循特定的导入过程。 例如,根据项目的生成方式(Maven 或 Gradle),您可能需要遵循特定的导入过程(例如,在 Eclipse 或 STS 中,您需要使用 File → Import → Maven →现有 Maven 项目)。

导入后,项目必须没有任何类型的错误。此外,应包含 .src/main/javacom.example.loggingconsumer.LoggingConsumerApplication

从技术上讲,此时,您可以运行应用程序的 main 类。 它已经是一个有效的 Spring Boot 应用程序。 但是,它什么都不做,所以我们想添加一些代码。

添加消息处理程序、构建和运行

将类修改为如下所示:com.example.loggingconsumer.LoggingConsumerApplication

@SpringBootApplication
public class LoggingConsumerApplication {

	public static void main(String[] args) {
		SpringApplication.run(LoggingConsumerApplication.class, args);
	}

	@Bean
	public Consumer<Person> log() {
	    return person -> {
	        System.out.println("Received: " + person);
	    };
	}

	public static class Person {
		private String name;
		public String getName() {
			return name;
		}
		public void setName(String name) {
			this.name = name;
		}
		public String toString() {
			return this.name;
		}
	}
}

从前面的清单中可以看出:

这样做还可以让您看到框架的核心功能之一:它尝试自动将传入的消息有效负载转换为类型 。Person

现在,您拥有了一个功能齐全的 Spring Cloud Stream 应用程序,该应用程序可以侦听消息。 从这里开始,为简单起见,我们假设您在第 1 步中选择了 RabbitMQ。 假设您已安装并运行 RabbitMQ,则可以通过在 IDE 中运行其方法来启动应用程序。main

您应该看到以下输出:

	--- [ main] c.s.b.r.p.RabbitExchangeQueueProvisioner : declaring queue for inbound: input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg, bound to: input
	--- [ main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
	--- [ main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#2a3a299:0/SimpleConnection@66c83fc8. . .
	. . .
	--- [ main] o.s.i.a.i.AmqpInboundChannelAdapter      : started inbound.input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg
	. . .
	--- [ main] c.e.l.LoggingConsumerApplication         : Started LoggingConsumerApplication in 2.531 seconds (JVM running for 2.897)

转到 RabbitMQ 管理控制台或任何其他 RabbitMQ 客户端,并向 发送消息。 该部分表示组名称并且是生成的,因此在您的环境中必然会有所不同。 对于更可预测的内容,您可以通过 set (或您喜欢的任何名称) 来使用显式组名称。input.anonymous.CbMIwdkJSBO1ZoPDOtHtCganonymous.CbMIwdkJSBO1ZoPDOtHtCgspring.cloud.stream.bindings.input.group=hello

消息的内容应为类的 JSON 表示形式,如下所示:Person

{"name":"Sam Spade"}

然后,在您的控制台中,您应该会看到:

Received: Sam Spade

您还可以构建应用程序并将其打包到引导 jar 中(使用 ),然后使用命令运行构建的 JAR。./mvnw clean installjava -jar

现在你有一个可以工作(尽管非常基本)的 Spring Cloud Stream 应用程序。

Streaming data 上下文中的 Spring 表达式语言 (SpEL)

在本参考手册中,您将遇到许多可以使用 Spring 表达式语言 (SpEL) 的功能和示例。在使用它时了解某些限制很重要。

SPEL 允许您访问当前消息以及您正在运行的应用程序上下文。 但是,了解 SPEL 可以看到什么类型的数据类型很重要,尤其是在传入Message的上下文中。 消息从 broker 以 byte[] 的形式到达。然后,它被 Binders 转换为 a,而你可以看到消息的有效负载保持其原始形式。消息的标头是 ,其中值通常是另一个基元或基元的集合/数组,因此是 Object。 这是因为 binder 不知道所需的 input 类型,因为它无权访问用户代码(函数)。因此,Binder 有效地以消息标头的形式传递了一个带有有效负载和一些可读元数据的信封,就像通过邮件发送的信件一样。 这意味着,虽然可以访问消息的有效负载,但您只能以原始数据(即 byte[]) 的形式访问它。虽然开发人员要求能够让 SpEL 访问作为具体类型(例如,Foo、Bar 等)的有效负载对象的字段的能力可能非常普遍,但您可以看到实现它有多么困难甚至不可能。 下面是一个演示该问题的示例;假设您有一个路由表达式,用于根据有效负载类型路由到不同的函数。此要求意味着将有效负载从 byte[] 转换为特定类型,然后应用 SPEL。但是,为了执行此类转换,我们需要知道要传递给 converter 的实际类型,而该类型来自函数的签名,我们不知道是哪一个。解决此要求的更好方法是将类型信息作为消息头传递(例如,)。您将获得一个清晰可读的 String 值,可以在一年内访问和评估该值,并且易于阅读的 SPEL 表达式。Message<byte[]><String, Object>application/json;type=foo.bar.Baz

此外,使用 payload 进行路由决策被认为是非常糟糕的做法,因为 payload 被认为是特权数据 - 数据只能由其最终接收者读取。同样,使用邮件投递的类比,您不希望邮递员打开您的信封并阅读信件的内容来做出一些投递决定。同样的概念在这里也适用,尤其是在生成 Message 时相对容易包含此类信息时。它强制执行与要通过网络传输的数据的设计以及哪些数据可以被视为公共数据以及哪些数据是特权数据相关的规则。


APP信息