本节将更详细地介绍如何创建 Streams,它们是 Spring Cloud Stream 应用程序的集合。它涵盖的主题包括 创建和部署 Streams。spring-doc.cn

如果您刚开始使用 Spring Cloud Data Flow,那么在深入研究之前,您可能应该阅读入门指南 本节。spring-doc.cn

17. 简介

Stream 是长期存在的 Spring Cloud Stream 应用程序的集合,它们通过消息传递中间件相互通信。 基于文本的 DSL 定义了应用程序之间的配置和数据流。虽然提供了许多应用程序来实施常见用例,但您通常会创建自定义 Spring Cloud Stream 应用程序来实现自定义业务逻辑。spring-doc.cn

Stream 的一般生命周期为:spring-doc.cn

  1. 注册应用程序。spring-doc.cn

  2. 创建 Stream Definition。spring-doc.cn

  3. 部署 Stream。spring-doc.cn

  4. 取消部署或销毁 Stream。spring-doc.cn

  5. 升级或回滚 Stream 中的应用程序。spring-doc.cn

要部署 Streams,必须将 Data Flow Server 配置为将部署委托给 Spring Cloud 生态系统中名为 Skipper 的新服务器。spring-doc.cn

此外,您还可以将 Skipper 配置为将应用程序部署到一个或多个 Cloud Foundry 组织和空间、Kubernetes 集群上的一个或多个命名空间或本地计算机。 在 Data Flow 中部署流时,您可以指定在部署时使用的平台。 Skipper 还为 Data Flow 提供了对已部署的流执行更新的功能。 更新流中的应用程序的方法有很多种,但最常见的示例之一是使用新的自定义业务逻辑升级处理器应用程序,同时保留现有的 source 和 sink 应用程序。spring-doc.cn

17.1. 流管道 DSL

流是使用受 Unix 启发的 Pipeline 语法定义的。 该语法使用竖线(称为 “管道”) 来连接多个命令。 Unix 中的命令获取进程的输出,并通过管道将其传输到进程的输入。 的输出反过来又发送到进程的输入。 每个符号将左侧命令的标准输出连接到右侧命令的标准输入。 数据从左到右流经管道。ls -l | grep key | lessls -lgrep keygrepless|spring-doc.cn

在 Data Flow 中,Unix 命令被 Spring Cloud Stream 应用程序取代,每个管道符号表示通过消息传递中间件(如 RabbitMQ 或 Apache Kafka)连接应用程序的输入和输出。spring-doc.cn

每个 Spring Cloud Stream 应用程序都注册在一个简单的名称下。 注册过程指定可从何处获取应用程序(例如,在 Maven 存储库或 Docker 注册表中)。您可以在本节中找到有关如何注册 Spring Cloud Stream 应用程序的更多信息。 在 Data Flow 中,我们将 Spring Cloud Stream 应用程序分类为 Sources、Processors 或 Sinks。spring-doc.cn

举个简单的例子,考虑从 HTTP Source 收集数据并写入 File Sink。 使用 DSL 时,流描述为:spring-doc.cn

http | filespring-doc.cn

涉及某些处理的流将表示为:spring-doc.cn

http | filter | transform | filespring-doc.cn

可以使用 shell 的命令创建流定义,如以下示例所示:stream createspring-doc.cn

dataflow:> stream create --name httpIngest --definition "http | file"spring-doc.cn

Stream DSL 将传递到 command 选项。--definitionspring-doc.cn

流定义的部署是通过 Shell 的命令完成的,如下所示:stream deployspring-doc.cn

dataflow:> stream deploy --name ticktockspring-doc.cn

“入门”部分介绍如何启动服务器以及如何启动和使用 Spring Cloud Data Flow shell。spring-doc.cn

请注意,shell 调用 Data Flow Server 的 REST API。有关直接向服务器发出 HTTP 请求的更多信息,请参阅 REST API 指南spring-doc.cn

在命名流定义时,请记住,流中的每个应用程序都将在平台上创建,其名称格式为 .因此,生成的应用程序名称的总长度不能超过 58 个字符。<stream name>-<app name>

17.2. 流应用程序 DSL

您可以使用 Stream 应用程序 DSL 为每个 Spring Cloud Stream 应用程序定义自定义绑定属性。 有关更多信息,请参阅微型网站的 Stream Application DSL 部分。spring-doc.cn

考虑以下 Java 接口,该接口定义了一个 input 方法和两个 output methods:spring-doc.cn

public interface Barista {

    @Input
    SubscribableChannel orders();

    @Output
    MessageChannel hotDrinks();

    @Output
    MessageChannel coldDrinks();
}

进一步考虑以下 Java 接口,这是创建 Kafka Streams 应用程序的典型接口:spring-doc.cn

interface KStreamKTableBinding {

    @Input
    KStream<?, ?> inputStream();

    @Input
    KTable<?, ?> inputTable();
}

在具有多个输入和输出绑定的情况下,Data Flow 无法对从一个应用程序到另一个应用程序的数据流做出任何假设。 因此,您需要设置绑定属性以“连接”应用程序。 Stream Application DSL 使用“双管道”而不是“管道符号”来指示 Data Flow 不应配置应用程序的绑定属性。认为 的意思是 “并行”。 以下示例显示了这样的 “parallel” 定义:||spring-doc.cn

dataflow:> stream create --definition "orderGeneratorApp || baristaApp || hotDrinkDeliveryApp || coldDrinkDeliveryApp" --name myCafeStream
打破变化!SCDF Local、Cloud Foundry 1.7.0 到 1.7.2 和 SCDF Kubernetes 1.7.0 到 1.7.1 的版本使用该字符作为应用程序之间的分隔符。这导致了传统 Stream DSL 的重大变化。虽然并不理想,但更改分隔符被认为是对现有用户影响最小的最佳解决方案。comma

此流有四个应用程序。 有两个输出目标 和 ,分别供 和 使用。 在部署此流时,您需要设置绑定属性,以便将热饮消息发送到目标,将冷饮消息发送到目标。 下面的清单是这样做的:baristaApphotDrinkscoldDrinkshotDrinkDeliveryAppcoldDrinkDeliveryAppbaristaApphotDrinkDeliveryAppcoldDrinkDeliveryAppspring-doc.cn

app.baristaApp.spring.cloud.stream.bindings.hotDrinks.destination=hotDrinksDest
app.baristaApp.spring.cloud.stream.bindings.coldDrinks.destination=coldDrinksDest
app.hotDrinkDeliveryApp.spring.cloud.stream.bindings.input.destination=hotDrinksDest
app.coldDrinkDeliveryApp.spring.cloud.stream.bindings.input.destination=coldDrinksDest

如果要使用消费组,则需要分别在生产者和消费应用上设置 Spring Cloud Stream 应用属性和 。spring.cloud.stream.bindings.<channelName>.producer.requiredGroupsspring.cloud.stream.bindings.<channelName>.groupspring-doc.cn

Stream 应用程序 DSL 的另一个常见用例是部署一个 HTTP 网关应用程序,该应用程序向 Kafka 或 RabbitMQ 应用程序发送同步请求或回复消息。 在这种情况下,HTTP 网关应用程序和 Kafka 或 RabbitMQ 应用程序都可以是不使用 Spring Cloud Stream 库的 Spring 集成应用程序。spring-doc.cn

也可以使用 Stream 应用程序 DSL 仅部署单个应用程序。spring-doc.cn

17.3. 应用程序属性

每个应用程序都使用属性来自定义其行为。例如,源模块公开了一个设置,该设置允许更改数据摄取端口的默认值:httpportspring-doc.cn

dataflow:> stream create --definition "http --port=8090 | log" --name myhttpstream

此属性实际上与标准 Spring Boot 属性相同。 Data Flow 添加了使用速记形式而不是 . 您还可以指定普通版本:portserver.portportserver.portspring-doc.cn

dataflow:> stream create --definition "http --server.port=8000 | log" --name myhttpstream

这种速记行为将在 白名单应用程序属性 一节中详细讨论。 如果已注册应用程序属性元数据,则可以在键入后在 shell 中使用 Tab 键自动补全来获取候选属性名称的列表。--spring-doc.cn

shell 为应用程序属性提供 Tab 键自动补全。shell 命令为所有受支持的属性提供了其他文档。app info --name <appName> --type <appType>spring-doc.cn

支持的 Stream 选项包括:、 和 。<appType>sourceprocessorsink

18. 流生命周期

流的生命周期经历以下阶段:spring-doc.cn

Skipper 是一个服务器,可让您发现 Spring Boot 应用程序并在多个云平台上管理其生命周期。spring-doc.cn

Skipper 中的应用程序捆绑为包含应用程序资源位置、应用程序属性和部署属性的包。 您可以将 Skipper 包视为类似于 或 等工具中的包。apt-getbrewspring-doc.cn

当 Data Flow 部署 Stream 时,它会生成一个包并将其上传到 Skipper,该包表示 Stream 中的应用程序。 用于升级或回滚 Stream 中的应用程序的后续命令将传递给 Skipper。 此外,Stream 定义是从包中逆向工程的,并且 Stream 的状态也委托给 Skipper。spring-doc.cn

18.1. 注册 Stream 应用程序

您可以使用以下命令注册受版本控制的流应用程序。您必须提供唯一名称、应用程序类型和可解析为应用程序构件的 URI。 对于类型,请指定 、 或 。版本从 URI 解析。以下是一些示例:app registersourceprocessorsinkspring-doc.cn

dataflow:>app register --name mysource --type source --uri maven://com.example:mysource:0.0.1
dataflow:>app register --name mysource --type source --uri maven://com.example:mysource:0.0.2
dataflow:>app register --name mysource --type source --uri maven://com.example:mysource:0.0.3

dataflow:>app list --id source:mysource
╔═══╤══════════════════╤═════════╤════╤════╗
║app│      source      │processor│sink│task║
╠═══╪══════════════════╪═════════╪════╪════╣
║   │> mysource-0.0.1 <│         │    │    ║
║   │mysource-0.0.2    │         │    │    ║
║   │mysource-0.0.3    │         │    │    ║
╚═══╧══════════════════╧═════════╧════╧════╝

dataflow:>app register --name myprocessor --type processor --uri file:///Users/example/myprocessor-1.2.3.jar

dataflow:>app register --name mysink --type sink --uri https://example.com/mysink-2.0.1.jar

应用程序 URI 应符合以下架构格式之一:spring-doc.cn

  • Maven 架构:spring-doc.cn

    maven://<groupId>:<artifactId>[:<extension>[:<classifier>]]:<version>
  • HTTP 架构:spring-doc.cn

    http://<web-path>/<artifactName>-<version>.jar
  • 文件架构:spring-doc.cn

    file:///<local-path>/<artifactName>-<version>.jar
  • Docker 架构:spring-doc.cn

    docker:<docker-image-path>/<imageName>:<version>
URI 部分对于版本控制流应用程序是必需的。 Skipper 使用多版本流应用程序,允许使用部署属性在运行时升级或回滚这些应用程序。<version>

如果要注册使用 RabbitMQ Binder 构建的 和 应用程序的快照版本,可以执行以下操作:httplogspring-doc.cn

dataflow:>app register --name http --type source --uri maven://org.springframework.cloud.stream.app:http-source-rabbit:1.2.1.BUILD-SNAPSHOT
dataflow:>app register --name log --type sink --uri maven://org.springframework.cloud.stream.app:log-sink-rabbit:1.2.1.BUILD-SNAPSHOT

如果您想一次注册多个应用程序,您可以将它们存储在一个属性文件中。 其中键的格式为 ,值是 URI。<type>.<name>spring-doc.cn

例如,要注册使用 RabbitMQ Binder 构建的 和 应用程序的快照版本,您可以在属性文件中包含以下内容(例如:httplogstream-apps.propertiesspring-doc.cn

source.http=maven://org.springframework.cloud.stream.app:http-source-rabbit:1.2.1.BUILD-SNAPSHOT
sink.log=maven://org.springframework.cloud.stream.app:log-sink-rabbit:1.2.1.BUILD-SNAPSHOT

然后,要批量导入应用程序,请使用命令并在开关中提供属性文件的位置,如下所示:app import--urispring-doc.cn

dataflow:>app import --uri file:///<YOUR_FILE_LOCATION>/stream-apps.properties

使用 注册应用程序与注册 、 或 相同。 这种类型的应用程序只能在 Stream Application DSL 中使用(在 DSL 中使用逗号而不是管道符号),并指示 Data Flow 不配置应用程序的 Spring Cloud Stream 绑定属性。 使用注册的应用程序不必是 Spring Cloud Stream 应用程序。它可以是任何 Spring Boot 应用程序。 有关使用此应用程序类型的更多信息,请参阅 Stream Application DSL 简介--type appsourceprocessorsinkapp--type appspring-doc.cn

您可以注册相同应用程序的多个版本(例如,相同的名称和类型),但只能将一个版本设置为默认版本。 默认版本用于部署 Streams。spring-doc.cn

首次注册应用程序时,它将被标记为 default。可以使用以下命令更改默认应用程序版本:app defaultspring-doc.cn

dataflow:>app default --id source:mysource --version 0.0.2
dataflow:>app list --id source:mysource
╔═══╤══════════════════╤═════════╤════╤════╗
║app│      source      │processor│sink│task║
╠═══╪══════════════════╪═════════╪════╪════╣
║   │mysource-0.0.1    │         │    │    ║
║   │> mysource-0.0.2 <│         │    │    ║
║   │mysource-0.0.3    │         │    │    ║
╚═══╧══════════════════╧═════════╧════╧════╝

该命令列出了给定流应用程序的所有版本。app list --id <type:name>spring-doc.cn

该命令有一个可选参数,用于指定要取消注册的应用程序版本:app unregister--versionspring-doc.cn

dataflow:>app unregister --name mysource --type source --version 0.0.1
dataflow:>app list --id source:mysource
╔═══╤══════════════════╤═════════╤════╤════╗
║app│      source      │processor│sink│task║
╠═══╪══════════════════╪═════════╪════╪════╣
║   │> mysource-0.0.2 <│         │    │    ║
║   │mysource-0.0.3    │         │    │    ║
╚═══╧══════════════════╧═════════╧════╧════╝

如果未指定,则默认版本未注册。--versionspring-doc.cn

流中的所有应用程序都应为要部署的流设置默认版本。 否则,在部署期间,它们将被视为未注册的应用程序。 使用该命令设置默认值。app defaultspring-doc.cn

app default --id source:mysource --version 0.0.3
dataflow:>app list --id source:mysource
╔═══╤══════════════════╤═════════╤════╤════╗
║app│      source      │processor│sink│task║
╠═══╪══════════════════╪═════════╪════╪════╣
║   │mysource-0.0.2    │         │    │    ║
║   │> mysource-0.0.3 <│         │    │    ║
╚═══╧══════════════════╧═════════╧════╧════╝

这需要设置默认应用程序版本。 但是,and 命令可以使用所有 (默认和非默认) 已注册的应用程序版本。stream deploystream updatestream rollbackspring-doc.cn

以下命令创建一个使用默认 mysource 版本 (0.0.3) 的流:spring-doc.cn

dataflow:>stream create foo --definition "mysource | log"

然后我们可以将版本更新到 0.0.2:spring-doc.cn

dataflow:>stream update foo --properties version.mysource=0.0.2
只有预先注册的应用程序才能用于 、 或 Stream。deployupdaterollback

尝试将 更新到版本 (未注册) 失败。mysource0.0.1spring-doc.cn

18.1.1. 注册支持的应用程序和任务

为方便起见,我们提供了带有应用程序 URI(适用于 Maven 和 Docker)的静态文件 适用于所有开箱即用的 Stream 和 Task 或 Batch 应用程序Starters。您可以指向此文件并导入 批量显示所有应用程序 URI。否则,如前所述,您可以单独注册它们或拥有自己的 自定义属性文件,其中仅包含所需的应用程序 URI。但是,我们建议将 “重点” 自定义属性文件中所需的应用程序 URI 的列表。spring-doc.cn

Spring Cloud Stream 应用程序Starters

下表包含指向基于 Spring Cloud Stream 2.1.x 的可用 Stream Application Starters 的链接 和 Spring Boot 2.1.x:dataflow.spring.iospring-doc.cn

Artifact 类型(Artifact Type) 稳定版 SNAPSHOT 版本

RabbitMQ + Mavenspring-doc.cn

dataflow.spring.io/rabbitmq-maven-latestspring-doc.cn

dataflow.spring.io/Einstein-BUILD-SNAPSHOT-stream-applications-rabbit-mavenspring-doc.cn

RabbitMQ + Dockerspring-doc.cn

dataflow.spring.io/rabbitmq-docker-latestspring-doc.cn

dataflow.spring.io/Einstein-BUILD-SNAPSHOT-stream-applications-rabbit-dockerspring-doc.cn

Apache Kafka + Mavenspring-doc.cn

dataflow.spring.io/kafka-maven-latestspring-doc.cn

dataflow.spring.io/Einstein-BUILD-SNAPSHOT-stream-applications-kafka-mavenspring-doc.cn

Apache Kafka + Dockerspring-doc.cn

dataflow.spring.io/kafka-docker-latestspring-doc.cn

dataflow.spring.io/Einstein-BUILD-SNAPSHOT-stream-applications-kafka-dockerspring-doc.cn

默认情况下,App Starter 执行器终端节点是安全的。您可以通过部署具有该属性的流来禁用安全性。 在 Kubernetes 上,请参阅 Liveness and readiness probes 部分以了解如何配置 执行器端点的安全性。app.*.spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration
从 Spring Cloud Stream 2.1 GA 版本开始,我们现在与 Spring Cloud Function 具有强大的互操作性 编程模型。在此基础上,使用 Einstein 发布序列,现在可以选择一些 Stream App Starters 并使用它们组成单个应用程序。查看 “组合函数支持 Spring Cloud Data Flow“博客,通过示例了解有关开发人员和编排体验的更多信息。
Spring Cloud Task App 启动程序

下表包括基于 Spring Cloud Task 2.1.x 和 Spring Boot 2.1.x 的可用 Task Application Starters:spring-doc.cn

Artifact 类型(Artifact Type) 稳定版 SNAPSHOT 版本

Maven 系列spring-doc.cn

dataflow.spring.io/task-maven-latestspring-doc.cn

dataflow.spring.io/Elston-BUILD-SNAPSHOT-task-applications-mavenspring-doc.cn

Jetty工人spring-doc.cn

dataflow.spring.io/task-docker-latestspring-doc.cn

dataflow.spring.io/Elston-BUILD-SNAPSHOT-task-applications-dockerspring-doc.cn

您可以在 Task App Starters 项目页面和 相关参考文档。有关可用 Stream Starters 的更多信息,请查看 Stream App Starters 项目页面和相关参考文档。spring-doc.cn

例如,如果您想批量注册使用 Kafka Binder 构建的所有开箱即用的流应用程序,可以使用以下命令:spring-doc.cn

$ dataflow:>app import --uri https://dataflow.spring.io/kafka-maven-latest

或者,您可以使用 Rabbit Binder 注册所有流应用程序,如下所示:spring-doc.cn

$ dataflow:>app import --uri https://dataflow.spring.io/rabbitmq-maven-latest

您还可以传递选项(默认情况下)来指示 Properties 文件的位置应在 shell 进程本身中解析。如果位置应该 从 Data Flow Server 进程中解析,请指定 .--localtrue--local falsespring-doc.cn

当您使用 或 时,如果应用程序已注册到 提供的 name 和 type 以及 version,默认情况下,它不会被覆盖。如果要覆盖 预先存在的应用程序或坐标,包括该选项。app registerapp importurimetadata-uri--forcespring-doc.cn

但请注意,下载后,应用程序可能会根据资源在本地缓存在 Data Flow 服务器上 位置。如果资源位置没有改变(即使实际资源字节可能不同),则 不会重新下载。另一方面,在使用资源时,使用恒定位置仍可能规避 缓存(如果使用版本)。maven://-SNAPSHOTspring-doc.cn

此外,如果已经部署了流并使用已注册应用程序的某个版本,则(强制)重新注册 不同的应用程序不起作用,直到再次部署流。spring-doc.cn

在某些情况下,资源在服务器端解析。在其他情况下, URI 将传递到运行时容器实例,并在其中对其进行解析。看 有关更多详细信息,请参阅每个 Data Flow Server 的特定文档。

18.1.2. 将应用程序属性列入白名单

Stream 和 Task 应用程序是 Spring Boot 应用程序,它们知道许多常见的应用程序属性,例如 ,但也知道属性系列,例如带有前缀 和 的属性。在创建自己的应用程序时,您应该创建一个允许的属性列表,以便 shell 和 UI 在通过 Tab 键完成或在下拉框中显示选项时,可以首先将它们显示为主要属性。server.portspring.jmxloggingspring-doc.cn

要创建允许的应用程序属性列表,请在资源目录中创建一个名为您可以在此文件中使用两个属性键。第一个键名为 。该值是以逗号分隔的完全限定类名列表。第二个键是 ,其值是以逗号分隔的属性名称列表。这可以包含属性的全名,例如 ,也可以包含部分名称以允许属性名称类别,例如 .spring-configuration-metadata-whitelist.propertiesMETA-INFconfiguration-properties.classes@ConfigurationPropertyconfiguration-properties.namesserver.portspring.jmxspring-doc.cn

Spring Cloud Stream 应用程序Starters是查找使用示例的好地方。以下示例来自文件接收器的文件:spring-configuration-metadata-whitelist.propertiesspring-doc.cn

configuration-properties.classes=org.springframework.cloud.stream.app.file.sink.FileSinkProperties

如果我们也想添加 be allowed,它将变为以下行:server.portspring-doc.cn

configuration-properties.classes=org.springframework.cloud.stream.app.file.sink.FileSinkProperties
configuration-properties.names=server.port

添加'spring-boot-configuration-processor'作为可选依赖项,以生成属性的配置元数据文件。spring-doc.cn

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-configuration-processor</artifactId>
    <optional>true</optional>
</dependency>

allow 支持仅适用于 uber-jar 应用程序工件。目前,无法直接从 Docker 化应用程序映像中检索元数据属性 - 需要一个专用的配套元数据 JAR。spring-doc.cn

可以允许在用 .不允许使用注释定义的属性。configuration-properties.names@ConfigurationProperties@Valuespring-doc.cn

18.1.3. 创建和使用专用的元数据工件

在描述流或任务应用程序支持的主要属性的过程中,您可以更进一步 创建 Metadata Companion 工件。此 jar 文件仅包含 Spring Boot JSON 文件 配置属性 metadata 和上一节中描述的 allow 文件。spring-doc.cn

以下示例显示了规范接收器的此类项目的内容:logspring-doc.cn

$ jar tvf log-sink-rabbit-1.2.1.BUILD-SNAPSHOT-metadata.jar
373848 META-INF/spring-configuration-metadata.json
   174 META-INF/spring-configuration-metadata-whitelist.properties

请注意,该文件非常大。这是因为它包含所有属性的串联,这些 在运行时可用于 sink (其中一些来自 、一些来自 、一些更多来自 等等)。数据流 始终依赖于所有这些属性,即使 Companion Artifact 不可用,但这里所有属性都已合并 合并到单个文件中。spring-configuration-metadata.jsonlogspring-boot-actuator.jarspring-boot-autoconfigure.jarspring-cloud-starter-stream-sink-log.jarspring-doc.cn

为了帮助解决这个问题(您不想尝试手动制作这个巨大的 JSON 文件),您可以使用 以下插件:spring-doc.cn

<plugin>
 	<groupId>org.springframework.cloud</groupId>
 	<artifactId>spring-cloud-app-starter-metadata-maven-plugin</artifactId>
 	<executions>
 		<execution>
 			<id>aggregate-metadata</id>
 			<phase>compile</phase>
 			<goals>
 				<goal>aggregate-metadata</goal>
 			</goals>
 		</execution>
 	</executions>
 </plugin>
此插件是创建单个 JSON 文件的补充。 请务必同时配置两者。spring-boot-configuration-processor

伴生工件的好处包括:spring-doc.cn

  • 更轻。(配套工件通常为几 KB,而不是实际应用程序的 MB。因此,它们的下载速度更快, 例如,在使用 Dashboard UI 时允许更快地提供反馈。app infospring-doc.cn

  • 由于更轻,它们可用于资源受限的环境(例如 PaaS),其中元数据 唯一需要的信息。spring-doc.cn

  • 对于不直接处理 Spring Boot uber jar 的环境(例如,基于 Docker 的运行时,例如 Kubernetes 或 Cloud Foundry),这是提供有关应用程序支持的属性的元数据的唯一方法。spring-doc.cn

但请记住,在处理 uber jar 时,这完全是可选的。uber jar 本身还包括 元数据。spring-doc.cn

18.1.4. 使用 Companion Artifact

拥有伴生工件后,您需要让系统知道它,以便可以使用它。spring-doc.cn

向 注册单个应用程序时,您可以使用 shell 中的可选选项:app register--metadata-urispring-doc.cn

dataflow:>app register --name log --type sink
    --uri maven://org.springframework.cloud.stream.app:log-sink:2.1.0.RELEASE
    --metadata-uri maven://org.springframework.cloud.stream.app:log-sink:jar:metadata:2.1.0.RELEASE

使用命令注册多个文件时,该文件应包含一行 除了每一行。严格来说,这样做是可选的(如果某些应用程序有它,但有些应用程序没有,它可以工作),但这是最佳实践。app import<type>.<name>.metadata<type>.<name>spring-doc.cn

以下示例显示了一个 Docker 化应用程序,其中元数据构件托管在 Maven 存储库中(检索 它通过或也会起作用)。http://file://spring-doc.cn

...
source.http=docker:springcloudstream/http-source-rabbit:latest
source.http.metadata=maven://org.springframework.cloud.stream.app:http-source-rabbit:jar:metadata:2.1.0.RELEASE
...

18.1.5. 创建自定义应用程序

虽然 Data Flow 包括 source、processor、sink 应用程序,但您可以扩展这些应用程序或编写自定义 Spring Cloud Stream 应用程序。spring-doc.cn

Spring Cloud Stream 文档中详细介绍了使用 Spring Initializr 创建 Spring Cloud Stream 应用程序的过程。 您可以将多个 Binders 包含在应用程序中。 如果这样做,请参阅 [passing_producer_consumer_properties] 中的说明以了解如何配置它们。spring-doc.cn

为了支持允许属性,在 Spring Cloud Data Flow 中运行的 Spring Cloud Stream 应用程序可以将 Spring Boot 作为可选依赖项包括在内,如以下示例所示:configuration-processorspring-doc.cn

<dependencies>
  <!-- other dependencies -->
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-configuration-processor</artifactId>
    <optional>true</optional>
  </dependency>
</dependencies>

注意:确保 包含在 POM 中。 该插件对于创建向 Spring Cloud Data Flow 注册的可执行 jar 是必需的。 Spring Initialzr 在生成的 POM 中包含插件。spring-boot-maven-pluginspring-doc.cn

创建自定义应用程序后,您可以注册它,如注册 Stream 应用程序中所述。spring-doc.cn

18.2. 创建流

Spring Cloud Data Flow Server 公开了一个完整的 RESTful API,用于管理流定义的生命周期,但最简单的使用方法是通过 Spring Cloud Data Flow shell。Getting Started 部分介绍了如何启动 shell。spring-doc.cn

新流是在流定义的帮助下创建的。这些定义是从一个简单的 DSL 构建的。例如,考虑一下如果我们运行以下 shell 命令会发生什么情况:spring-doc.cn

dataflow:> stream create --definition "time | log" --name ticktock

这将定义一个名为 的流,该流基于 DSL 表达式。DSL 使用“管道”符号 () 将源连接到接收器。ticktocktime | log|spring-doc.cn

该命令显示有关流的有用信息,如以下示例中所示(及其输出):stream infospring-doc.cn

dataflow:>stream info ticktock
╔═══════════╤═════════════════╤══════════╗
║Stream Name│Stream Definition│  Status  ║
╠═══════════╪═════════════════╪══════════╣
║ticktock   │time | log       │undeployed║
╚═══════════╧═════════════════╧══════════╝

18.2.1. 应用程序属性

应用程序属性是与流中的每个应用程序关联的属性。部署应用程序时,应用程序属性将通过以下方式应用于应用程序 命令行参数或环境变量,具体取决于底层部署实现。spring-doc.cn

以可以在创建流时定义应用程序属性:spring-doc.cn

dataflow:> stream create --definition "time | log" --name ticktock

shell 命令显示应用程序允许的应用程序属性。 有关属性允许的更多信息,请参阅将应用程序属性列入白名单app info --name <appName> --type <appType>spring-doc.cn

下面的清单显示了应用程序允许的属性:timespring-doc.cn

dataflow:> app info --name time --type source
╔══════════════════════════════╤══════════════════════════════╤══════════════════════════════╤══════════════════════════════╗
║         Option Name          │         Description          │           Default            │             Type             ║
╠══════════════════════════════╪══════════════════════════════╪══════════════════════════════╪══════════════════════════════╣
║trigger.time-unit             │The TimeUnit to apply to delay│<none>                        │java.util.concurrent.TimeUnit ║
║                              │values.                       │                              │                              ║
║trigger.fixed-delay           │Fixed delay for periodic      │1                             │java.lang.Integer             ║
║                              │triggers.                     │                              │                              ║
║trigger.cron                  │Cron expression value for the │<none>                        │java.lang.String              ║
║                              │Cron Trigger.                 │                              │                              ║
║trigger.initial-delay         │Initial delay for periodic    │0                             │java.lang.Integer             ║
║                              │triggers.                     │                              │                              ║
║trigger.max-messages          │Maximum messages per poll, -1 │1                             │java.lang.Long                ║
║                              │means infinity.               │                              │                              ║
║trigger.date-format           │Format for the date value.    │<none>                        │java.lang.String              ║
╚══════════════════════════════╧══════════════════════════════╧══════════════════════════════╧══════════════════════════════╝

下面的清单显示了应用程序允许的属性:logspring-doc.cn

dataflow:> app info --name log --type sink
╔══════════════════════════════╤══════════════════════════════╤══════════════════════════════╤══════════════════════════════╗
║         Option Name          │         Description          │           Default            │             Type             ║
╠══════════════════════════════╪══════════════════════════════╪══════════════════════════════╪══════════════════════════════╣
║log.name                      │The name of the logger to use.│<none>                        │java.lang.String              ║
║log.level                     │The level at which to log     │<none>                        │org.springframework.integratio║
║                              │messages.                     │                              │n.handler.LoggingHandler$Level║
║log.expression                │A SpEL expression (against the│payload                       │java.lang.String              ║
║                              │incoming message) to evaluate │                              │                              ║
║                              │as the logged message.        │                              │                              ║
╚══════════════════════════════╧══════════════════════════════╧══════════════════════════════╧══════════════════════════════╝

您可以在创建 时为 和 应用程序指定应用程序属性,如下所示:timelogstreamspring-doc.cn

dataflow:> stream create --definition "time --fixed-delay=5 | log --level=WARN" --name ticktock

请注意,在前面的示例中,为 和 应用程序定义的 and 属性是 shell 完成提供的“短格式”属性名称。 这些 “short-format” 属性名称仅适用于允许的属性。在所有其他情况下,应仅使用完全限定的属性名称。fixed-delayleveltimelogspring-doc.cn

18.2.2. 通用应用程序属性

除了通过 DSL 进行配置之外, Spring Cloud Data Flow 还提供了一种机制,用于将公共属性设置为所有 它启动的流式处理应用程序。 这可以通过在启动时添加前缀为 的属性来完成 服务器。 执行此操作时,服务器会将所有属性(不带前缀)传递给它启动的实例。spring.cloud.dataflow.applicationProperties.streamspring-doc.cn

例如,所有启动的应用程序都可以配置为使用特定的 Kafka 代理,方法是启动 数据流服务器具有以下选项:spring-doc.cn

--spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.kafka.binder.brokers=192.168.1.100:9092
--spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.kafka.binder.zkNodes=192.168.1.100:2181

这样做会导致 and 属性 传递给所有启动的应用程序。spring.cloud.stream.kafka.binder.brokersspring.cloud.stream.kafka.binder.zkNodesspring-doc.cn

使用此机制配置的属性的优先级低于流部署属性。 如果在流部署时指定了具有相同键的属性(例如,覆盖公共属性),则会覆盖这些属性。app.http.spring.cloud.stream.kafka.binder.brokers

18.3. 部署 Stream

本节介绍当 Spring Cloud Data Flow 服务器负责部署 Stream 时,如何部署 Stream。它涵盖了使用 Skipper 服务部署 Streams 和升级。有关如何设置部署属性的描述适用于 Stream 部署的两种方法。spring-doc.cn

考虑流定义:ticktockspring-doc.cn

dataflow:> stream create --definition "time | log" --name ticktock

要部署流,请使用以下 shell 命令:spring-doc.cn

dataflow:> stream deploy --name ticktock

数据流服务器将 和 应用程序的解析和部署委托给 Skipper。timelogspring-doc.cn

该命令显示有关流的有用信息,包括部署属性:stream infospring-doc.cn

dataflow:>stream info --name ticktock
╔═══════════╤═════════════════╤═════════╗
║Stream Name│Stream Definition│  Status ║
╠═══════════╪═════════════════╪═════════╣
║ticktock   │time | log       │deploying║
╚═══════════╧═════════════════╧═════════╝

Stream Deployment properties: {
  "log" : {
    "resource" : "maven://org.springframework.cloud.stream.app:log-sink-rabbit",
    "spring.cloud.deployer.group" : "ticktock",
    "version" : "2.0.1.RELEASE"
  },
  "time" : {
    "resource" : "maven://org.springframework.cloud.stream.app:time-source-rabbit",
    "spring.cloud.deployer.group" : "ticktock",
    "version" : "2.0.1.RELEASE"
  }
}

该命令有一个重要的可选 command 参数 (called )。 可以将 Skipper 配置为部署到多个平台。 Skipper 预先配置了一个名为 的平台,该平台将应用程序部署到运行 Skipper 的本地计算机。 命令行参数的默认值为 . 如果您通常部署到一个平台,则在安装 Skipper 时,您可以覆盖平台的配置。 否则,请将 to 指定为命令返回的值之一。--platformNamestream deploydefault--platformNamedefaultdefaultplatformNamestream platform-listspring-doc.cn

在前面的示例中,时间源每秒将当前时间作为消息发送一次,日志接收器使用日志框架输出该时间。 您可以对日志(具有后缀)进行尾部排序。日志文件位于 Data Flow Server 的日志输出中显示的目录中,如下面的清单所示:stdout<instance>spring-doc.cn

$ tail -f /var/folders/wn/8jxm_tbd1vj28c8vj37n900m0000gn/T/spring-cloud-dataflow-912434582726479179/ticktock-1464788481708/ticktock.log/stdout_0.log
2016-06-01 09:45:11.250  INFO 79194 --- [  kafka-binder-] log.sink    : 06/01/16 09:45:11
2016-06-01 09:45:12.250  INFO 79194 --- [  kafka-binder-] log.sink    : 06/01/16 09:45:12
2016-06-01 09:45:13.251  INFO 79194 --- [  kafka-binder-] log.sink    : 06/01/16 09:45:13

您还可以通过在创建流时传递标志,一步创建和部署流,如下所示:--deployspring-doc.cn

dataflow:> stream create --definition "time | log" --name ticktock --deploy

但是,在实际使用案例中,一步创建和部署流并不常见。 原因是,当您使用该命令时,您可以传入定义如何将应用程序映射到平台的属性(例如,要使用的容器的内存大小、要运行的每个应用程序的数量以及是否启用数据分区功能)。 Properties 还可以覆盖在创建流时设置的应用程序属性。 以下部分将详细介绍此功能。stream deployspring-doc.cn

18.3.1. 部署属性

部署流时,您可以指定可以控制应用程序部署和配置方式的属性。有关更多信息,请参阅微型网站的 Deployment Properties 部分。spring-doc.cn

18.4. 销毁 Stream

您可以通过从 shell 发出命令来删除流,如下所示:stream destroyspring-doc.cn

dataflow:> stream destroy --name ticktock

如果已部署流,则会在删除流定义之前取消部署该流。spring-doc.cn

18.5. 取消部署 Stream

通常,您希望停止流,但保留名称和定义以备将来使用。在这种情况下,您可以按名称对流进行描述:undeployspring-doc.cn

dataflow:> stream undeploy --name ticktock
dataflow:> stream deploy --name ticktock

您可以稍后发出该命令以重新启动它:deployspring-doc.cn

dataflow:> stream deploy --name ticktock

18.6. 验证流

有时,流定义中包含的应用程序在其注册中包含无效的 URI。 这可能是由于在应用程序注册时输入了无效的 URI,或者从要从中提取应用程序的存储库中删除了应用程序。 要验证流中包含的所有应用程序都是可解析的,用户可以使用以下命令:validatespring-doc.cn

dataflow:>stream validate ticktock
╔═══════════╤═════════════════╗
║Stream Name│Stream Definition║
╠═══════════╪═════════════════╣
║ticktock   │time | log       ║
╚═══════════╧═════════════════╝


ticktock is a valid stream.
╔═══════════╤═════════════════╗
║ App Name  │Validation Status║
╠═══════════╪═════════════════╣
║source:time│valid            ║
║sink:log   │valid            ║
╚═══════════╧═════════════════╝

在前面的示例中,用户验证了他们的 ticktock 流。和 都是有效的。 现在,我们可以看到如果我们有一个流定义,其中包含一个具有无效 URI 的已注册应用程序,会发生什么情况:source:timesink:logspring-doc.cn

dataflow:>stream validate bad-ticktock
╔════════════╤═════════════════╗
║Stream Name │Stream Definition║
╠════════════╪═════════════════╣
║bad-ticktock│bad-time | log   ║
╚════════════╧═════════════════╝


bad-ticktock is an invalid stream.
╔═══════════════╤═════════════════╗
║   App Name    │Validation Status║
╠═══════════════╪═════════════════╣
║source:bad-time│invalid          ║
║sink:log       │valid            ║
╚═══════════════╧═════════════════╝

在这种情况下,Spring Cloud Data Flow 指出流无效,因为 URI 无效。source:bad-timespring-doc.cn

18.7. 更新流

要更新流,请使用 command,该命令将 or 作为 command 参数。 Skipper 有一个重要的新顶级前缀: . 以下命令 deploy stream (在部署时注册的版本为 ):stream update--properties--propertiesFileversionhttp | loglog1.1.0.RELEASEspring-doc.cn

dataflow:> stream create --name httptest --definition "http --server.port=9000 | log"
dataflow:> stream deploy --name httptest
dataflow:>stream info httptest
╔══════════════════════════════╤══════════════════════════════╤════════════════════════════╗
║             Name             │             DSL              │          Status            ║
╠══════════════════════════════╪══════════════════════════════╪════════════════════════════╣
║httptest                      │http --server.port=9000 | log │deploying                   ║
╚══════════════════════════════╧══════════════════════════════╧════════════════════════════╝

Stream Deployment properties: {
  "log" : {
    "spring.cloud.deployer.indexed" : "true",
    "spring.cloud.deployer.group" : "httptest",
    "maven://org.springframework.cloud.stream.app:log-sink-rabbit" : "1.1.0.RELEASE"
  },
  "http" : {
    "spring.cloud.deployer.group" : "httptest",
    "maven://org.springframework.cloud.stream.app:http-source-rabbit" : "1.1.0.RELEASE"
  }
}

然后,以下命令更新流以使用日志应用程序的版本。 在使用应用程序的特定版本更新流之前,我们需要确保应用程序已注册到该版本:1.2.0.RELEASEspring-doc.cn

dataflow:>app register --name log --type sink --uri maven://org.springframework.cloud.stream.app:log-sink-rabbit:1.2.0.RELEASE
Successfully registered application 'sink:log'

然后我们可以更新应用程序:spring-doc.cn

dataflow:>stream update --name httptest --properties version.log=1.2.0.RELEASE
您只能将预先注册的应用程序版本用于 、 或 流。deployupdaterollback

要验证部署属性和更新版本,我们可以使用 ,如以下示例中所示(及其输出):stream infospring-doc.cn

dataflow:>stream info httptest
╔══════════════════════════════╤══════════════════════════════╤════════════════════════════╗
║             Name             │             DSL              │          Status            ║
╠══════════════════════════════╪══════════════════════════════╪════════════════════════════╣
║httptest                      │http --server.port=9000 | log │deploying                   ║
╚══════════════════════════════╧══════════════════════════════╧════════════════════════════╝

Stream Deployment properties: {
  "log" : {
    "spring.cloud.deployer.indexed" : "true",
    "spring.cloud.deployer.count" : "1",
    "spring.cloud.deployer.group" : "httptest",
    "maven://org.springframework.cloud.stream.app:log-sink-rabbit" : "1.2.0.RELEASE"
  },
  "http" : {
    "spring.cloud.deployer.group" : "httptest",
    "maven://org.springframework.cloud.stream.app:http-source-rabbit" : "1.1.0.RELEASE"
  }
}

18.8. 强制更新 Stream

升级流时,您可以使用该选项来部署当前部署的应用程序的新实例,即使应用程序或部署属性尚未更改。 当应用程序本身在启动时(例如,从 Spring Cloud Config Server)获取配置信息时,需要此行为。 您可以使用该选项指定要强制升级的应用程序。 如果未指定任何应用程序名称,则所有应用程序都将强制升级。 您可以将 and 选项与 or 选项一起指定。--force--app-names--force--app-names--properties--propertiesFilespring-doc.cn

18.9. 流版本

Skipper 保留已部署流的历史记录。 更新 Stream 后,将有 Stream 的第二个版本。 您可以使用以下命令查询版本的历史记录:stream history --name <name-of-stream>spring-doc.cn

dataflow:>stream history --name httptest
╔═══════╤════════════════════════════╤════════╤════════════╤═══════════════╤════════════════╗
║Version│        Last updated        │ Status │Package Name│Package Version│  Description   ║
╠═══════╪════════════════════════════╪════════╪════════════╪═══════════════╪════════════════╣
║2      │Mon Nov 27 22:41:16 EST 2017│DEPLOYED│httptest    │1.0.0          │Upgrade complete║
║1      │Mon Nov 27 22:40:41 EST 2017│DELETED │httptest    │1.0.0          │Delete complete ║
╚═══════╧════════════════════════════╧════════╧════════════╧═══════════════╧════════════════╝

18.10. 流清单

替换所有值后,Skipper 会保留所有应用程序、其应用程序属性及其部署属性的“清单”。 这表示部署到平台的内容的最终状态。 您可以使用以下命令查看 Stream 的任何版本的清单:spring-doc.cn

stream manifest --name <name-of-stream> --releaseVersion <optional-version>

如果未指定 the,则返回最后一个版本的清单。--releaseVersionspring-doc.cn

以下示例显示了清单的用法:spring-doc.cn

dataflow:>stream manifest --name httptest

使用该命令将产生以下输出:spring-doc.cn

# Source: log.yml
apiVersion: skipper.spring.io/v1
kind: SpringCloudDeployerApplication
metadata:
  name: log
spec:
  resource: maven://org.springframework.cloud.stream.app:log-sink-rabbit
  version: 1.2.0.RELEASE
  applicationProperties:
    spring.metrics.export.triggers.application.includes: integration**
    spring.cloud.dataflow.stream.app.label: log
    spring.cloud.stream.metrics.key: httptest.log.${spring.cloud.application.guid}
    spring.cloud.stream.bindings.input.group: httptest
    spring.cloud.stream.metrics.properties: spring.application.name,spring.application.index,spring.cloud.application.*,spring.cloud.dataflow.*
    spring.cloud.dataflow.stream.name: httptest
    spring.cloud.dataflow.stream.app.type: sink
    spring.cloud.stream.bindings.input.destination: httptest.http
  deploymentProperties:
    spring.cloud.deployer.indexed: true
    spring.cloud.deployer.group: httptest
    spring.cloud.deployer.count: 1

---
# Source: http.yml
apiVersion: skipper.spring.io/v1
kind: SpringCloudDeployerApplication
metadata:
  name: http
spec:
  resource: maven://org.springframework.cloud.stream.app:http-source-rabbit
  version: 1.2.0.RELEASE
  applicationProperties:
    spring.metrics.export.triggers.application.includes: integration**
    spring.cloud.dataflow.stream.app.label: http
    spring.cloud.stream.metrics.key: httptest.http.${spring.cloud.application.guid}
    spring.cloud.stream.bindings.output.producer.requiredGroups: httptest
    spring.cloud.stream.metrics.properties: spring.application.name,spring.application.index,spring.cloud.application.*,spring.cloud.dataflow.*
    server.port: 9000
    spring.cloud.stream.bindings.output.destination: httptest.http
    spring.cloud.dataflow.stream.name: httptest
    spring.cloud.dataflow.stream.app.type: source
  deploymentProperties:
    spring.cloud.deployer.group: httptest

大多数部署和应用程序属性由 Data Flow 设置,以使应用程序能够相互通信并发送带有标识标签的应用程序指标。spring-doc.cn

18.11. 回滚流

您可以使用以下命令回滚到流的先前版本:stream rollbackspring-doc.cn

dataflow:>stream rollback --name httptest

可选的 command 参数添加流的版本。 如果未指定,则回滚操作将转到以前的流版本。--releaseVersionspring-doc.cn

18.12. 应用程序计数

应用程序计数是系统的动态属性,用于指定应用程序的实例数。有关更多信息,请参阅微型网站的 Application Count 部分。spring-doc.cn

18.13. Skipper 的升级策略

Skipper 有一个简单的 “红/黑” 升级策略。它使用与当前正在运行的版本一样多的实例来部署新版本的应用程序,并检查应用程序的终端节点。 如果新应用程序的运行状况良好,则会取消部署以前的应用程序。 如果新应用程序的运行状况不佳,则会取消部署所有新应用程序,并且认为升级不成功。/healthspring-doc.cn

升级策略不是滚动升级,因此,如果应用程序的 5 个实例正在运行,那么,在晴天的情况下,在取消部署旧版本之前,其中 5 个新应用程序也在运行。spring-doc.cn

19. 流 DSL

本部分介绍流 DSL 简介中未涵盖的流 DSL 的其他功能。spring-doc.cn

19.1. 点击流

可以在流中的各种创建者终端节点创建 Tap。有关更多信息,请参阅微型网站的 Tapping a Stream 部分。spring-doc.cn

19.2. 在 Stream 中使用 Labels

当流由多个同名应用程序组成时,必须使用标签对它们进行限定。 有关更多信息,请参阅微型网站的 Labeling Applications 部分。spring-doc.cn

19.3. 命名目的地

您可以使用命名目标,而不是引用源或接收器应用程序。 有关更多信息,请参阅微型网站的 Named Destinations 部分。spring-doc.cn

19.4. Fan-in 和 Fan-out

通过使用命名目标,您可以支持扇入和扇出使用案例。 有关更多信息,请参阅微型网站的 Fan-in 和 Fan-out 部分。spring-doc.cn

20. 流式传输 Java DSL

您可以使用模块提供的基于 Java 的 DSL,而不是使用 shell 来创建和部署流。 有关更多信息,请参阅微型网站的 Java DSL 部分。spring-cloud-dataflow-rest-clientspring-doc.cn

21. 使用多个 Binder 配置流式传输应用程序

在某些情况下,当需要连接到不同的消息传递时,流可以将其应用程序绑定到多个 Spring Cloud 流 Binders 中间件配置。在这些情况下,您应该确保使用其 Binder 正确配置应用程序 配置。例如,支持 Kafka 和 Rabbit Binders 的多 Binder 转换器是以中的处理器:spring-doc.cn

http | multibindertransform --expression=payload.toUpperCase() | log
在前面的示例中,您将编写自己的应用程序。multibindertransform

在此流中,每个应用程序都按以下方式连接到消息中间件:spring-doc.cn

  1. HTTP 源将事件发送到 RabbitMQ ()。rabbit1spring-doc.cn

  2. Multi-Binder 转换处理器从 RabbitMQ () 接收事件,并将处理后的事件发送到 Kafka () 中。rabbit1kafka1spring-doc.cn

  3. 日志接收器从 Kafka () 接收事件。kafka1spring-doc.cn

这里,和 是 Spring Cloud Stream 应用程序属性中给出的 Binder 名称。 基于此设置,应用程序的 Classpaths 中具有以下 Binders 并具有适当的配置:rabbit1kafka1spring-doc.cn

可以在应用程序本身中设置配置属性。 如果没有,则可以在部署流时通过属性传递它们:spring-cloud-streambinderdeploymentspring-doc.cn

dataflow:>stream create --definition "http | multibindertransform --expression=payload.toUpperCase() | log" --name mystream

dataflow:>stream deploy mystream --properties "app.http.spring.cloud.stream.bindings.output.binder=rabbit1,app.multibindertransform.spring.cloud.stream.bindings.input.binder=rabbit1,
app.multibindertransform.spring.cloud.stream.bindings.output.binder=kafka1,app.log.spring.cloud.stream.bindings.input.binder=kafka1"

您可以通过部署属性指定任何 Binder 配置属性来覆盖这些属性。spring-doc.cn

22. 功能组成

函数组合允许您将函数逻辑动态地附加到现有事件流应用程序。有关更多详细信息,请参阅微型网站的函数组成部分。spring-doc.cn

23. 功能应用

使用 Spring Cloud Stream 3.x 添加功能支持,您只需分别实现 Java Util 的 、 和 接口即可构建 和 应用程序。 有关此功能的更多信息,请参阅 SCDF 站点的 Functional Application RecipeSourceSinkProcessorSupplierConsumerFunctionspring-doc.cn

24. 示例

本章包括以下示例:spring-doc.cn

您可以在 “[dataflow-samples]” 一章中找到更多示例的链接。spring-doc.cn

24.1. 简单的流处理

作为简单处理步骤的示例,我们可以使用以定义将 HTTP 发布的数据的有效负载转换为大写:spring-doc.cn

http | transform --expression=payload.toUpperCase() | log

要创建此流,请在 shell 中输入以下命令:spring-doc.cn

dataflow:> stream create --definition "http --server.port=9000 | transform --expression=payload.toUpperCase() | log" --name mystream --deploy

以下示例使用 shell 命令发布一些数据:spring-doc.cn

dataflow:> http post --target http://localhost:9000 --data "hello"

前面的示例在日志中生成大写,如下所示:HELLOspring-doc.cn

2016-06-01 09:54:37.749  INFO 80083 --- [  kafka-binder-] log.sink    : HELLO

24.2. 有状态流处理

为了演示数据分区功能,下面的清单部署了一个以 Kafka 作为 Binder 的流:spring-doc.cn

dataflow:>stream create --name words --definition "http --server.port=9900 | splitter --expression=payload.split(' ') | log"
Created new stream 'words'

dataflow:>stream deploy words --properties "app.splitter.producer.partitionKeyExpression=payload,deployer.log.count=2"
Deployed stream 'words'

dataflow:>http post --target http://localhost:9900 --data "How much wood would a woodchuck chuck if a woodchuck could chuck wood"
> POST (text/plain;Charset=UTF-8) http://localhost:9900 How much wood would a woodchuck chuck if a woodchuck could chuck wood
> 202 ACCEPTED


dataflow:>runtime apps
╔════════════════════╤═══════════╤═══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╗
║App Id / Instance Id│Unit Status│                                                               No. of Instances / Attributes                                                               ║
╠════════════════════╪═══════════╪═══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╣
║words.log-v1        │ deployed  │                                                                             2                                                                             ║
╟┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈╢
║                    │           │       guid = 24166                                                                                                                                        ║
║                    │           │        pid = 33097                                                                                                                                        ║
║                    │           │       port = 24166                                                                                                                                        ║
║words.log-v1-0      │ deployed  │     stderr = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461063/words.log-v1/stderr_0.log     ║
║                    │           │     stdout = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461063/words.log-v1/stdout_0.log     ║
║                    │           │        url = https://192.168.0.102:24166                                                                                                                   ║
║                    │           │working.dir = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461063/words.log-v1                  ║
╟┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈╢
║                    │           │       guid = 41269                                                                                                                                        ║
║                    │           │        pid = 33098                                                                                                                                        ║
║                    │           │       port = 41269                                                                                                                                        ║
║words.log-v1-1      │ deployed  │     stderr = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461063/words.log-v1/stderr_1.log     ║
║                    │           │     stdout = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461063/words.log-v1/stdout_1.log     ║
║                    │           │        url = https://192.168.0.102:41269                                                                                                                   ║
║                    │           │working.dir = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461063/words.log-v1                  ║
╟────────────────────┼───────────┼───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╢
║words.http-v1       │ deployed  │                                                                             1                                                                             ║
╟┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈╢
║                    │           │       guid = 9900                                                                                                                                         ║
║                    │           │        pid = 33094                                                                                                                                        ║
║                    │           │       port = 9900                                                                                                                                         ║
║words.http-v1-0     │ deployed  │     stderr = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461054/words.http-v1/stderr_0.log    ║
║                    │           │     stdout = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461054/words.http-v1/stdout_0.log    ║
║                    │           │        url = https://192.168.0.102:9900                                                                                                                    ║
║                    │           │working.dir = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461054/words.http-v1                 ║
╟────────────────────┼───────────┼───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╢
║words.splitter-v1   │ deployed  │                                                                             1                                                                             ║
╟┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈╢
║                    │           │       guid = 33963                                                                                                                                        ║
║                    │           │        pid = 33093                                                                                                                                        ║
║                    │           │       port = 33963                                                                                                                                        ║
║words.splitter-v1-0 │ deployed  │     stderr = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803437542/words.splitter-v1/stderr_0.log║
║                    │           │     stdout = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803437542/words.splitter-v1/stdout_0.log║
║                    │           │        url = https://192.168.0.102:33963                                                                                                                   ║
║                    │           │working.dir = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803437542/words.splitter-v1             ║
╚════════════════════╧═══════════╧═══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╝

当您查看日志时,您应该会看到以下内容:words.log-v1-0spring-doc.cn

2016-06-05 18:35:47.047  INFO 58638 --- [  kafka-binder-] log.sink                                 : How
2016-06-05 18:35:47.066  INFO 58638 --- [  kafka-binder-] log.sink                                 : chuck
2016-06-05 18:35:47.066  INFO 58638 --- [  kafka-binder-] log.sink                                 : chuck

当您查看日志时,您应该会看到以下内容:words.log-v1-1spring-doc.cn

2016-06-05 18:35:47.047  INFO 58639 --- [  kafka-binder-] log.sink                                 : much
2016-06-05 18:35:47.066  INFO 58639 --- [  kafka-binder-] log.sink                                 : wood
2016-06-05 18:35:47.066  INFO 58639 --- [  kafka-binder-] log.sink                                 : would
2016-06-05 18:35:47.066  INFO 58639 --- [  kafka-binder-] log.sink                                 : a
2016-06-05 18:35:47.066  INFO 58639 --- [  kafka-binder-] log.sink                                 : woodchuck
2016-06-05 18:35:47.067  INFO 58639 --- [  kafka-binder-] log.sink                                 : if
2016-06-05 18:35:47.067  INFO 58639 --- [  kafka-binder-] log.sink                                 : a
2016-06-05 18:35:47.067  INFO 58639 --- [  kafka-binder-] log.sink                                 : woodchuck
2016-06-05 18:35:47.067  INFO 58639 --- [  kafka-binder-] log.sink                                 : could
2016-06-05 18:35:47.067  INFO 58639 --- [  kafka-binder-] log.sink                                 : wood

此示例显示,包含相同单词的有效负载拆分将路由到同一应用程序实例。spring-doc.cn

24.3. 其他 source 和 sink 应用程序类型

此示例显示了一些更复杂的内容:将源替换为其他内容。另一种受支持的源类型是 ,它接受通过 HTTP POST 请求摄取的数据。请注意,该源接受与数据流服务器不同的端口(默认为 8080)上的数据。默认情况下,端口是随机分配的。timehttphttpspring-doc.cn

要创建使用源但仍使用相同 sink 的流,我们将 Simple Stream Processing 示例中的原始命令更改为以下内容:httplogspring-doc.cn

dataflow:> stream create --definition "http | log" --name myhttpstream --deploy

请注意,这一次,在我们实际发布一些数据(通过使用 shell 命令)之前,我们不会看到任何其他输出。要查看源正在侦听的随机分配的端口,请运行以下命令:httpspring-doc.cn

dataflow:>runtime apps

╔══════════════════════╤═══════════╤═════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╗
║ App Id / Instance Id │Unit Status│                                                                    No. of Instances / Attributes                                                                    ║
╠══════════════════════╪═══════════╪═════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╣
║myhttpstream.log-v1   │ deploying │                                                                                  1                                                                                  ║
╟┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈╢
║                      │           │       guid = 39628                                                                                                                                                  ║
║                      │           │        pid = 34403                                                                                                                                                  ║
║                      │           │       port = 39628                                                                                                                                                  ║
║myhttpstream.log-v1-0 │ deploying │     stderr = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/myhttpstream-1542803867070/myhttpstream.log-v1/stderr_0.log ║
║                      │           │     stdout = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/myhttpstream-1542803867070/myhttpstream.log-v1/stdout_0.log ║
║                      │           │        url = https://192.168.0.102:39628                                                                                                                             ║
║                      │           │working.dir = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/myhttpstream-1542803867070/myhttpstream.log-v1              ║
╟──────────────────────┼───────────┼─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╢
║myhttpstream.http-v1  │ deploying │                                                                                  1                                                                                  ║
╟┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈╢
║                      │           │       guid = 52143                                                                                                                                                  ║
║                      │           │        pid = 34401                                                                                                                                                  ║
║                      │           │       port = 52143                                                                                                                                                  ║
║myhttpstream.http-v1-0│ deploying │     stderr = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/myhttpstream-1542803866800/myhttpstream.http-v1/stderr_0.log║
║                      │           │     stdout = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/myhttpstream-1542803866800/myhttpstream.http-v1/stdout_0.log║
║                      │           │        url = https://192.168.0.102:52143                                                                                                                             ║
║                      │           │working.dir = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/myhttpstream-1542803866800/myhttpstream.http-v1             ║
╚══════════════════════╧═══════════╧═════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╝

您应该看到相应的源具有一个属性,该属性包含它正在侦听的主机和端口信息。现在,您可以发布到该 URL,如以下示例所示:httpurlspring-doc.cn

dataflow:> http post --target http://localhost:1234 --data "hello"
dataflow:> http post --target http://localhost:1234 --data "goodbye"

然后,流将数据从源汇集到接收器实现的输出日志,产生类似于以下内容的输出:httplogspring-doc.cn

2016-06-01 09:50:22.121  INFO 79654 --- [  kafka-binder-] log.sink    : hello
2016-06-01 09:50:26.810  INFO 79654 --- [  kafka-binder-] log.sink    : goodbye

我们还可以更改 sink 实现。您可以将输出通过管道传输到文件 (),hadoop () 或任何其他可用的 sink 应用程序。您还可以定义自己的应用程序。filehdfsspring-doc.cn