Spring 的数据集成之旅始于 Spring Integration。凭借其编程模型,它提供了一致的开发人员体验来构建可以采用企业集成模式的应用程序,以连接外部系统(如数据库、消息代理等)。Spring中文文档

快进到云时代,微服务在企业环境中变得突出。Spring Boot 改变了开发人员构建应用程序的方式。借助 Spring 的编程模型和 Spring Boot 处理的运行时职责,开发独立的、基于 Spring 的生产级微服务变得无缝。Spring中文文档

为了将其扩展到数据集成工作负载,Spring Integration 和 Spring Boot 被组合到一个新项目中。春云流诞生了。Spring中文文档

借助 Spring Cloud Stream,开发人员可以:Spring中文文档

  • 隔离构建、测试和部署以数据为中心的应用程序。Spring中文文档

  • 应用现代微服务架构模式,包括通过消息传递进行组合。Spring中文文档

  • 将应用程序责任与以事件为中心的思维脱钩。事件可以表示及时发生的事情,下游消费者应用程序可以在不知道其来源或生产者身份的情况下做出反应。Spring中文文档

  • 将业务逻辑移植到消息代理(如 RabbitMQ、Apache Kafka、Amazon Kinesis)上。Spring中文文档

  • 依靠框架对常见用例的自动内容类型支持。可以扩展到不同的数据转换类型。Spring中文文档

  • 还有很多。。。Spring中文文档

快速上手

您可以在不到 5 分钟的时间内试用 Spring Cloud Stream,甚至在您按照此三步指南进入任何细节之前。Spring中文文档

我们将向您展示如何创建一个Spring Cloud Stream应用程序,该应用程序接收来自您选择的消息传递中间件的消息(稍后会详细介绍),并将收到的消息记录到控制台。 我们称之为 . 虽然不是很实用,但它很好地介绍了一些主要概念 和抽象,使其更容易消化本用户指南的其余部分。LoggingConsumerSpring中文文档

这三个步骤如下:Spring中文文档

使用 Spring Initializr 创建示例应用程序

要开始使用,请访问 Spring Initializr。从那里,您可以生成我们的应用程序。为此,请执行以下操作:LoggingConsumerSpring中文文档

  1. “依赖项”部分中,开始键入 。 当出现“Cloud Stream”选项时,选择它。streamSpring中文文档

  2. 开始键入“kafka”或“rabbit”。Spring中文文档

  3. 选择“Kafka”或“RabbitMQ”。Spring中文文档

    基本上,您可以选择应用程序绑定到的消息传递中间件。 我们建议您使用您已经安装的那个,或者对安装和运行感到更舒服。 此外,正如您从 Initilaizer 屏幕中看到的那样,您还可以选择其他一些选项。 例如,您可以选择 Gradle 作为构建工具,而不是 Maven(默认设置)。Spring中文文档

  4. Artifact 字段中,键入 'logging-consumer'。Spring中文文档

    “工件”字段的值将成为应用程序名称。 如果选择 RabbitMQ 作为中间件,则 Spring Initializr 现在应如下所示:Spring中文文档

Spring initializr
  1. 单击“生成项目”按钮。Spring中文文档

    这样做会将生成的项目的压缩版本下载到您的硬盘。Spring中文文档

  2. 将文件解压缩到要用作项目目录的文件夹中。Spring中文文档

我们鼓励您探索 Spring Initializr 中提供的许多可能性。 它允许您创建许多不同类型的 Spring 应用程序。
我们鼓励您探索 Spring Initializr 中提供的许多可能性。 它允许您创建许多不同类型的 Spring 应用程序。

将项目导入到 IDE 中

现在,您可以将项目导入到 IDE 中。 请记住,根据 IDE,您可能需要遵循特定的导入过程。 例如,根据项目的生成方式(Maven 或 Gradle),您可能需要遵循特定的导入过程(例如,在 Eclipse 或 STS 中,您需要使用 Maven →现有 Maven 项目→文件→导入)。Spring中文文档

导入后,项目不得有任何类型的错误。此外,还应包含 .src/main/javacom.example.loggingconsumer.LoggingConsumerApplicationSpring中文文档

从技术上讲,此时,您可以运行应用程序的主类。 它已经是一个有效的 Spring Boot 应用程序。 但是,它不做任何事情,所以我们想添加一些代码。Spring中文文档

添加消息处理程序、生成和运行

修改类,如下所示:com.example.loggingconsumer.LoggingConsumerApplicationSpring中文文档

@SpringBootApplication
public class LoggingConsumerApplication {

	public static void main(String[] args) {
		SpringApplication.run(LoggingConsumerApplication.class, args);
	}

	@Bean
	public Consumer<Person> log() {
	    return person -> {
	        System.out.println("Received: " + person);
	    };
	}

	public static class Person {
		private String name;
		public String getName() {
			return name;
		}
		public void setName(String name) {
			this.name = name;
		}
		public String toString() {
			return this.name;
		}
	}
}

从前面的列表中可以看出:Spring中文文档

这样做还可以让您看到框架的核心功能之一:它尝试自动将传入的消息有效负载转换为类型。PersonSpring中文文档

现在,您拥有一个功能齐全的 Spring Cloud Stream 应用程序,该应用程序可以侦听消息。 从这里开始,为简单起见,我们假设您在第一中选择了 RabbitMQ。 假设您已安装并运行 RabbitMQ,则可以通过在 IDE 中运行其方法来启动应用程序。mainSpring中文文档

您应该看到以下输出:Spring中文文档

	--- [ main] c.s.b.r.p.RabbitExchangeQueueProvisioner : declaring queue for inbound: input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg, bound to: input
	--- [ main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
	--- [ main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#2a3a299:0/SimpleConnection@66c83fc8. . .
	. . .
	--- [ main] o.s.i.a.i.AmqpInboundChannelAdapter      : started inbound.input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg
	. . .
	--- [ main] c.e.l.LoggingConsumerApplication         : Started LoggingConsumerApplication in 2.531 seconds (JVM running for 2.897)

转到 RabbitMQ 管理控制台或任何其他 RabbitMQ 客户端,然后向 发送消息。 该部件表示组名称并生成,因此在您的环境中必然会有所不同。 对于更可预测的内容,您可以通过设置(或您喜欢的任何名称)使用显式组名称。input.anonymous.CbMIwdkJSBO1ZoPDOtHtCganonymous.CbMIwdkJSBO1ZoPDOtHtCgspring.cloud.stream.bindings.input.group=helloSpring中文文档

消息的内容应为类的 JSON 表示形式,如下所示:PersonSpring中文文档

{"name":"Sam Spade"}

然后,在您的主机中,您应该看到:Spring中文文档

Received: Sam SpadeSpring中文文档

您还可以构建应用程序并将其打包到引导 jar 中(使用 ),并使用命令运行构建的 JAR。./mvnw clean installjava -jarSpring中文文档

现在你有一个工作(尽管非常基础)的Spring Cloud Stream应用程序。Spring中文文档

流数据上下文中的 Spring Expression Language (SpEL) in the context of Streaming data (SpEL)

在本参考手册中,您将遇到许多可以使用 Spring 表达式语言 (SpEL) 的功能和示例。在使用它时,了解某些限制非常重要。Spring中文文档

SpEL 允许您访问当前消息以及您正在运行的应用程序上下文。 但是,了解 SpEL 可以看到哪些类型的数据非常重要,尤其是在传入消息的上下文中。 从代理中,消息以 byte[] 的形式到达。然后,它被绑定器转换为 a,而您可以看到消息的有效负载保持其原始形式。消息的标头是 ,其中值通常是另一个基元或基元的集合/数组,因此是 Object。 这是因为 binder 不知道所需的输入类型,因为它无法访问用户代码(函数)。因此,活页夹有效地以邮件标题的形式传递了一个带有有效载荷和一些可读元数据的信封,就像通过邮件传递的信件一样。 这意味着,虽然可以访问消息的有效负载,但您只能将其作为原始数据(即 byte[])访问。虽然开发人员要求能够让 SpEL 访问有效载荷对象的字段作为具体类型(例如,Foo、Bar 等)可能很常见,但您可以看到实现它是多么困难甚至不可能。 下面是一个示例来演示该问题;假设您有一个路由表达式,用于根据有效负载类型路由到不同的函数。此要求意味着将有效负载从 byte[] 转换为特定类型,然后应用 SpEL。但是,为了执行这种转换,我们需要知道要传递给转换器的实际类型,这来自函数的签名,我们不知道是哪一个。解决此要求的更好方法是将类型信息作为消息头传递(例如,)。您将获得一个清晰可读的 String 值,该值可以在一年内访问和计算,并且易于阅读 SpEL 表达式。Message<byte[]><String, Object>application/json;type=foo.bar.BazSpring中文文档

此外,使用有效负载进行路由决策被认为是非常糟糕的做法,因为有效负载被视为特权数据 - 数据只能由其最终接收者读取。同样,使用邮件投递类比,您不希望邮递员打开您的信封并阅读信件的内容来做出一些投递决定。同样的概念也适用于这里,特别是当在生成消息时包含此类信息相对容易时。它强制执行与要通过网络传输的数据的设计相关的一定程度的纪律,以及这些数据中的哪些部分可以被视为公共数据,哪些数据是特权数据。Spring中文文档