本节将更详细地介绍如何创建 Streams,它们是 Spring Cloud Stream 应用程序的集合。它涵盖的主题包括 创建和部署 Streams。spring-doc.cadn.net.cn

如果您刚开始使用 Spring Cloud Data Flow,那么在深入研究之前,您可能应该阅读入门指南 本节。spring-doc.cadn.net.cn

17. 简介

Stream 是长期存在的 Spring Cloud Stream 应用程序的集合,它们通过消息传递中间件相互通信。 基于文本的 DSL 定义了应用程序之间的配置和数据流。虽然提供了许多应用程序来实施常见用例,但您通常会创建自定义 Spring Cloud Stream 应用程序来实现自定义业务逻辑。spring-doc.cadn.net.cn

Stream 的一般生命周期为:spring-doc.cadn.net.cn

  1. 注册应用程序。spring-doc.cadn.net.cn

  2. 创建 Stream Definition。spring-doc.cadn.net.cn

  3. 部署 Stream。spring-doc.cadn.net.cn

  4. 取消部署或销毁 Stream。spring-doc.cadn.net.cn

  5. 升级或回滚 Stream 中的应用程序。spring-doc.cadn.net.cn

要部署 Streams,必须将 Data Flow Server 配置为将部署委托给 Spring Cloud 生态系统中名为 Skipper 的新服务器。spring-doc.cadn.net.cn

此外,您还可以将 Skipper 配置为将应用程序部署到一个或多个 Cloud Foundry 组织和空间、Kubernetes 集群上的一个或多个命名空间或本地计算机。 在 Data Flow 中部署流时,您可以指定在部署时使用的平台。 Skipper 还为 Data Flow 提供了对已部署流执行更新的功能。 更新流中的应用程序的方法有很多种,但最常见的示例之一是使用新的自定义业务逻辑升级处理器应用程序,同时保留现有的 source 和 sink 应用程序。spring-doc.cadn.net.cn

17.1. 流管道 DSL

流是使用受 Unix 启发的 Pipeline 语法定义的。 该语法使用竖线(称为 “管道”) 来连接多个命令。 命令ls -l | grep key | less在 Unix 中获取ls -lprocess 并将其通过管道传输到grep key过程。 的grep依次发送到less过程。 每|symbol 将左侧命令的标准输出连接到右侧命令的标准输入。 数据从左到右流经管道。spring-doc.cadn.net.cn

在 Data Flow 中,Unix 命令被 Spring Cloud Stream 应用程序取代,每个管道符号表示通过消息传递中间件(如 RabbitMQ 或 Apache Kafka)连接应用程序的输入和输出。spring-doc.cadn.net.cn

每个 Spring Cloud Stream 应用程序都注册在一个简单的名称下。 注册过程指定可从何处获取应用程序(例如,在 Maven 存储库或 Docker 注册表中)。 在 Data Flow 中,我们将 Spring Cloud Stream 应用程序分类为 Sources、Processors 或 Sinks。spring-doc.cadn.net.cn

举个简单的例子,考虑从 HTTP Source 收集数据并写入 File Sink。 使用 DSL 时,流描述为:spring-doc.cadn.net.cn

涉及某些处理的流将表示为:spring-doc.cadn.net.cn

http | filter | transform | filespring-doc.cadn.net.cn

可以使用 shell 的stream create命令,如以下示例所示:spring-doc.cadn.net.cn

dataflow:> stream create --name httpIngest --definition "http | file"spring-doc.cadn.net.cn

Stream DSL 将传递到--definition命令选项。spring-doc.cadn.net.cn

流定义的部署是通过 Shell 的stream deploy命令,如下所示:spring-doc.cadn.net.cn

dataflow:> stream deploy --name ticktockspring-doc.cadn.net.cn

“入门”部分介绍如何启动服务器以及如何启动和使用 Spring Cloud Data Flow shell。spring-doc.cadn.net.cn

请注意,shell 调用 Data Flow Server 的 REST API。有关直接向服务器发出 HTTP 请求的更多信息,请参阅 REST API 指南spring-doc.cadn.net.cn

在命名流定义时,请记住,流中的每个应用程序都将在平台上创建,其名称格式为<stream name>-<app name>.因此,生成的应用程序名称的总长度不能超过 58 个字符。

17.2. 流应用程序 DSL

您可以使用 Stream 应用程序 DSL 为每个 Spring Cloud Stream 应用程序定义自定义绑定属性。 有关更多信息,请参阅微型网站的 Stream Application DSL 部分。spring-doc.cadn.net.cn

17.3. 应用程序属性

每个应用程序都使用属性来自定义其行为。例如,httpsource 模块公开了一个port设置,该设置允许更改数据摄取端口的默认值:spring-doc.cadn.net.cn

dataflow:> stream create --definition "http --port=8090 | log" --name myhttpstream

portproperty 实际上与标准的 Spring Boot 相同server.port财产。 Data Flow 添加了使用速记形式的功能port而不是server.port. 您还可以指定普通版本:spring-doc.cadn.net.cn

dataflow:> stream create --definition "http --server.port=8000 | log" --name myhttpstream

这种速记行为将在 Stream Application Properties 一节中详细讨论。 如果已注册应用程序属性元数据,则可以在键入后在 shell 中使用 Tab 键自动补全来获取候选属性名称的列表。--spring-doc.cadn.net.cn

shell 为应用程序属性提供 Tab 键自动补全。这app info --name <appName> --type <appType>shell 命令提供了所有受支持属性的其他文档。spring-doc.cadn.net.cn

支持的流<appType>可能性包括:source,processorsink.

18. 流生命周期

流的生命周期经历以下阶段:spring-doc.cadn.net.cn

  1. 寄存器流定义spring-doc.cadn.net.cn

  2. 使用定义创建流spring-doc.cadn.net.cn

  3. 部署流spring-doc.cadn.net.cn

  4. 销毁或取消部署流spring-doc.cadn.net.cn

  5. 升级或回滚信息流中的应用spring-doc.cadn.net.cn

Skipper 是一个服务器,可让您发现 Spring Boot 应用程序并在多个云平台上管理其生命周期。spring-doc.cadn.net.cn

Skipper 中的应用程序捆绑为包含应用程序资源位置、应用程序属性和部署属性的包。 您可以将 Skipper 包视为类似于工具中的包,例如apt-getbrew.spring-doc.cadn.net.cn

当 Data Flow 部署 Stream 时,它会生成一个包并将其上传到 Skipper,该包表示 Stream 中的应用程序。 用于升级或回滚 Stream 中的应用程序的后续命令将传递给 Skipper。 此外,Stream 定义是从包中逆向工程的,并且 Stream 的状态也委托给 Skipper。spring-doc.cadn.net.cn

18.1. 注册 Stream 应用程序

您可以使用app register命令。您必须提供唯一名称、应用程序类型和可解析为应用程序构件的 URI。 对于类型,请指定source,processorsink.版本从 URI 解析。以下是一些示例:spring-doc.cadn.net.cn

dataflow:>app register --name mysource --type source --uri maven://com.example:mysource:0.0.1
dataflow:>app register --name mysource --type source --uri maven://com.example:mysource:0.0.2
dataflow:>app register --name mysource --type source --uri maven://com.example:mysource:0.0.3

dataflow:>app list --id source:mysource
╔═══╤══════════════════╤═════════╤════╤════╗
║app│      source      │processor│sink│task║
╠═══╪══════════════════╪═════════╪════╪════╣
║   │> mysource-0.0.1 <│         │    │    ║
║   │mysource-0.0.2    │         │    │    ║
║   │mysource-0.0.3    │         │    │    ║
╚═══╧══════════════════╧═════════╧════╧════╝

dataflow:>app register --name myprocessor --type processor --uri file:///Users/example/myprocessor-1.2.3.jar

dataflow:>app register --name mysink --type sink --uri https://example.com/mysink-2.0.1.jar

应用程序 URI 应符合以下架构格式之一:spring-doc.cadn.net.cn

The URI<version>part 对于版本控制流应用程序是必需的。 Skipper 使用多版本流应用程序,允许使用部署属性在运行时升级或回滚这些应用程序。

如果要注册httplog使用 RabbitMQ Binder 构建的应用程序,您可以执行以下作:spring-doc.cadn.net.cn

dataflow:>app register --name http --type source --uri maven://org.springframework.cloud.stream.app:http-source-rabbit:3.2.1
dataflow:>app register --name log --type sink --uri maven://org.springframework.cloud.stream.app:log-sink-rabbit:3.2.1

如果要一次注册多个应用程序,可以将它们存储在属性文件中,其中键的格式为<type>.<name>值是 URI。spring-doc.cadn.net.cn

例如,要注册httplog使用 RabbitMQ Binder 构建的应用程序,您可以在属性文件中包含以下内容(例如,stream-apps.properties):spring-doc.cadn.net.cn

source.http=maven://org.springframework.cloud.stream.app:http-source-rabbit:3.2.1
sink.log=maven://org.springframework.cloud.stream.app:log-sink-rabbit:3.2.1

然后,要批量导入应用程序,请使用app import命令,并使用--uriswitch 中,如下所示:spring-doc.cadn.net.cn

dataflow:>app import --uri file:///<YOUR_FILE_LOCATION>/stream-apps.properties

使用 注册应用程序--type app与注册source,processorsink. 类型app只能在 Stream Application DSL(使用双管道||而不是单个管道|在 DSL 中),并指示 Data Flow 不配置应用程序的 Spring Cloud Stream 绑定属性。 使用--type app不必是 Spring Cloud Stream 应用程序。它可以是任何 Spring Boot 应用程序。 有关使用此应用程序类型的更多信息,请参阅 Stream Application DSL 简介spring-doc.cadn.net.cn

您可以注册相同应用程序的多个版本(例如,相同的名称和类型),但只能将一个版本设置为默认版本。 默认版本用于部署 Streams。spring-doc.cadn.net.cn

首次注册应用程序时,它将被标记为 default。默认应用程序版本可以使用app default命令:spring-doc.cadn.net.cn

dataflow:>app default --id source:mysource --version 0.0.2
dataflow:>app list --id source:mysource
╔═══╤══════════════════╤═════════╤════╤════╗
║app│      source      │processor│sink│task║
╠═══╪══════════════════╪═════════╪════╪════╣
║   │mysource-0.0.1    │         │    │    ║
║   │> mysource-0.0.2 <│         │    │    ║
║   │mysource-0.0.3    │         │    │    ║
╚═══╧══════════════════╧═════════╧════╧════╝

app list --id <type:name>命令列出给定流应用程序的所有版本。spring-doc.cadn.net.cn

app unregistercommand 具有可选的--version参数指定要取消注册的应用程序版本:spring-doc.cadn.net.cn

dataflow:>app unregister --name mysource --type source --version 0.0.1
dataflow:>app list --id source:mysource
╔═══╤══════════════════╤═════════╤════╤════╗
║app│      source      │processor│sink│task║
╠═══╪══════════════════╪═════════╪════╪════╣
║   │> mysource-0.0.2 <│         │    │    ║
║   │mysource-0.0.3    │         │    │    ║
╚═══╧══════════════════╧═════════╧════╧════╝

如果--version未指定,则默认版本为 unregistered。spring-doc.cadn.net.cn

流中的所有应用程序都应为要部署的流设置默认版本。 否则,在部署期间,它们将被视为未注册的应用程序。 使用app default命令设置默认值。spring-doc.cadn.net.cn

app default --id source:mysource --version 0.0.3
dataflow:>app list --id source:mysource
╔═══╤══════════════════╤═════════╤════╤════╗
║app│      source      │processor│sink│task║
╠═══╪══════════════════╪═════════╪════╪════╣
║   │mysource-0.0.2    │         │    │    ║
║   │> mysource-0.0.3 <│         │    │    ║
╚═══╧══════════════════╧═════════╧════╧════╝

stream deploy需要设置默认应用程序版本。 这stream updatestream rollback但是,命令可以使用所有 (默认和非默认) 已注册的应用程序版本。spring-doc.cadn.net.cn

以下命令创建一个使用默认 mysource 版本 (0.0.3) 的流:spring-doc.cadn.net.cn

dataflow:>stream create foo --definition "mysource | log"

然后我们可以将版本更新到 0.0.2:spring-doc.cadn.net.cn

dataflow:>stream update foo --properties version.mysource=0.0.2
只有预先注册的应用程序才能用于deploy,updaterollback一个 Stream。

尝试更新mysource目标版本0.0.1(未注册) 失败。spring-doc.cadn.net.cn

18.1.1. 注册开箱即用的应用程序和任务

为方便起见,我们为所有开箱即用的流和任务应用程序提供了带有应用程序 URI(适用于 Maven 和 Docker)的静态文件。 您可以指向此文件并批量导入所有应用程序 URI。 否则,如前所述,您可以单独注册它们,也可以拥有自己的自定义属性文件,其中仅包含所需的应用程序 URI。 但是,我们建议在自定义属性文件中有一个所需应用程序 URI 的“重点”列表。spring-doc.cadn.net.cn

开箱即用的 Stream 应用程序

下表包括dataflow.spring.io指向基于 Spring Cloud Stream 的 Stream 应用程序的链接3.2.x和 Spring Boot2.7.x.spring-doc.cadn.net.cn

Artifact 类型(Artifact Type) 稳定版 SNAPSHOT 版本

RabbitMQ + Mavenspring-doc.cadn.net.cn

dataflow.spring.io/rabbitmq-maven-latestspring-doc.cadn.net.cn

dataflow.spring.io/rabbitmq-maven-latest-snapshotspring-doc.cadn.net.cn

RabbitMQ + Dockerspring-doc.cadn.net.cn

dataflow.spring.io/rabbitmq-docker-latestspring-doc.cadn.net.cn

dataflow.spring.io/rabbitmq-docker-latest-snapshotspring-doc.cadn.net.cn

Apache Kafka + Mavenspring-doc.cadn.net.cn

dataflow.spring.io/kafka-maven-latestspring-doc.cadn.net.cn

dataflow.spring.io/kafka-maven-latest-snapshotspring-doc.cadn.net.cn

Apache Kafka + Dockerspring-doc.cadn.net.cn

dataflow.spring.io/kafka-docker-latestspring-doc.cadn.net.cn

dataflow.spring.io/kafka-docker-latest-snapshotspring-doc.cadn.net.cn

默认情况下,开箱即用的应用程序的 actuator endpoints 是安全的。您可以通过设置以下属性来部署流来禁用安全性:app.*.spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration

在 Kubernetes 上,请参阅 Liveness and readiness probes 部分以了解如何配置 执行器端点的安全性。spring-doc.cadn.net.cn

开箱即用的任务应用程序

下表包括dataflow.spring.io指向基于 Spring Cloud Task 的任务应用程序的链接2.4.x和 Spring Boot2.7.x.spring-doc.cadn.net.cn

Artifact 类型(Artifact Type) 稳定版 SNAPSHOT 版本

Maven 系列spring-doc.cadn.net.cn

dataflow.spring.io/task-maven-latestspring-doc.cadn.net.cn

dataflow.spring.io/task-maven-latest-snapshotspring-doc.cadn.net.cn

Jetty工人spring-doc.cadn.net.cn

dataflow.spring.io/task-docker-latestspring-doc.cadn.net.cn

dataflow.spring.io/task-docker-latest-snapshotspring-doc.cadn.net.cn

有关可用的开箱即用流应用程序的更多信息,请参阅 Spring Cloud Stream 应用程序项目页面。spring-doc.cadn.net.cn

有关可用的开箱即用任务应用程序的更多信息,请参阅 timestamp-tasktimestamp-batch 文档。spring-doc.cadn.net.cn

例如,如果您想批量注册使用 Kafka Binder 构建的所有开箱即用的流应用程序,可以使用以下命令:spring-doc.cadn.net.cn

$ dataflow:>app import --uri https://dataflow.spring.io/kafka-maven-latest

或者,您可以使用 Rabbit Binder 注册所有流应用程序,如下所示:spring-doc.cadn.net.cn

$ dataflow:>app import --uri https://dataflow.spring.io/rabbitmq-maven-latest

您还可以将--local选项(即true)来指示 Properties 文件的位置应在 shell 进程本身中解析。如果位置应该 从 Data Flow Server 进程中解析,请指定--local false.spring-doc.cadn.net.cn

当您使用app registerapp import(如果应用程序已注册 提供的 name 和 type 以及 version,默认情况下,它不会被覆盖。如果要覆盖 预先存在的应用程序urimetadata-uri坐标,包括--force选择。spring-doc.cadn.net.cn

但请注意,下载后,应用程序可能会根据资源在本地缓存在 Data Flow 服务器上 位置。如果资源位置没有改变(即使实际资源字节可能不同),则 不会重新下载。使用maven://另一方面,使用常量位置仍可能规避 缓存(如果使用-SNAPSHOT版本)。spring-doc.cadn.net.cn

此外,如果已经部署了流并使用已注册应用程序的某个版本,则(强制)重新注册 不同的应用程序不起作用,直到再次部署流。spring-doc.cadn.net.cn

在某些情况下,资源在服务器端解析。在其他情况下, URI 将传递到运行时容器实例,并在其中对其进行解析。看 有关更多详细信息,请参阅每个 Data Flow Server 的特定文档。

18.1.2. 注册自定义应用程序

虽然 Data Flow 包括 source、processor、sink 应用程序,但您可以扩展这些应用程序或编写自定义 Spring Cloud Stream 应用程序。 您可以按照 Microsite 上的 Stream Development 指南创建自己的自定义应用程序。 创建自定义应用程序后,您可以注册它,如注册 Stream 应用程序中所述。spring-doc.cadn.net.cn

18.2. 创建流

Spring Cloud Data Flow Server 公开了一个完整的 RESTful API,用于管理流定义的生命周期,但最简单的使用方法是通过 Spring Cloud Data Flow shell。Getting Started 部分介绍了如何启动 shell。spring-doc.cadn.net.cn

新流是在流定义的帮助下创建的。这些定义是从一个简单的 DSL 构建的。例如,考虑一下如果我们运行以下 shell 命令会发生什么情况:spring-doc.cadn.net.cn

dataflow:> stream create --definition "time | log" --name ticktock

这将定义一个名为ticktock基于 DSL 表达式time | log.DSL 使用“管道”符号 (|) 将源连接到接收器。spring-doc.cadn.net.cn

stream info命令显示有关流的有用信息,如以下示例所示(及其输出):spring-doc.cadn.net.cn

dataflow:>stream info ticktock
╔═══════════╤═════════════════╤═══════════╤══════════╗
║Stream Name│Stream Definition│Description│  Status  ║
╠═══════════╪═════════════════╪═══════════╪══════════╣
║ticktock   │time | log       │           │undeployed║
╚═══════════╧═════════════════╧═══════════╧══════════╝

18.2.1. Stream 应用程序属性

应用程序属性是与流中的每个应用程序关联的属性。部署应用程序时,应用程序属性将通过以下方式应用于应用程序 命令行参数或环境变量,具体取决于底层部署实现。spring-doc.cadn.net.cn

以可以在创建流时定义应用程序属性:spring-doc.cadn.net.cn

dataflow:> stream create --definition "time | log" --name ticktock

app info --name <appName> --type <appType>shell 命令显示应用程序的公开应用程序属性。 有关公开属性的更多信息,请参阅应用程序元数据spring-doc.cadn.net.cn

以下清单显示了time应用:spring-doc.cadn.net.cn

dataflow:> app info --name time --type source
Information about source application 'time':
Version: '3.2.1':
Default application version: 'true':
Resource URI: maven://org.springframework.cloud.stream.app:time-source-rabbit:3.2.1
╔══════════════════════════════╤══════════════════════════════╤══════════════════════════════╤══════════════════════════════╗
║         Option Name          │         Description          │           Default            │             Type             ║
╠══════════════════════════════╪══════════════════════════════╪══════════════════════════════╪══════════════════════════════╣
║spring.integration.poller.max-│Maximum number of messages to │<none>                        │java.lang.Integer             ║
║messages-per-poll             │poll per polling cycle.       │                              │                              ║
║spring.integration.poller.fixe│Polling rate period. Mutually │<none>                        │java.time.Duration            ║
║d-rate                        │exclusive with 'fixedDelay'   │                              │                              ║
║                              │and 'cron'.                   │                              │                              ║
║spring.integration.poller.fixe│Polling delay period. Mutually│<none>                        │java.time.Duration            ║
║d-delay                       │exclusive with 'cron' and     │                              │                              ║
║                              │'fixedRate'.                  │                              │                              ║
║spring.integration.poller.rece│How long to wait for messages │1s                            │java.time.Duration            ║
║ive-timeout                   │on poll.                      │                              │                              ║
║spring.integration.poller.cron│Cron expression for polling.  │<none>                        │java.lang.String              ║
║                              │Mutually exclusive with       │                              │                              ║
║                              │'fixedDelay' and 'fixedRate'. │                              │                              ║
║spring.integration.poller.init│Polling initial delay. Applied│<none>                        │java.time.Duration            ║
║ial-delay                     │for 'fixedDelay' and          │                              │                              ║
║                              │'fixedRate'; ignored for      │                              │                              ║
║                              │'cron'.                       │                              │                              ║
║time.date-format              │Format for the date value.    │MM/dd/yy HH:mm:ss             │java.lang.String              ║
╚══════════════════════════════╧══════════════════════════════╧══════════════════════════════╧══════════════════════════════╝

以下清单显示了log应用:spring-doc.cadn.net.cn

dataflow:> app info --name log --type sink
Information about sink application 'log':
Version: '3.2.1':
Default application version: 'true':
Resource URI: maven://org.springframework.cloud.stream.app:log-sink-rabbit:3.2.1
╔══════════════════════════════╤══════════════════════════════╤══════════════════════════════╤══════════════════════════════╗
║         Option Name          │         Description          │           Default            │             Type             ║
╠══════════════════════════════╪══════════════════════════════╪══════════════════════════════╪══════════════════════════════╣
║log.name                      │The name of the logger to use.│<none>                        │java.lang.String              ║
║log.level                     │The level at which to log     │<none>                        │org.springframework.integratio║
║                              │messages.                     │                              │n.handler.LoggingHandler$Level║
║log.expression                │A SpEL expression (against the│payload                       │java.lang.String              ║
║                              │incoming message) to evaluate │                              │                              ║
║                              │as the logged message.        │                              │                              ║
╚══════════════════════════════╧══════════════════════════════╧══════════════════════════════╧══════════════════════════════╝

您可以为timelog应用程序stream创建,如下所示:spring-doc.cadn.net.cn

dataflow:> stream create --definition "time --fixed-delay=5 | log --level=WARN" --name ticktock

请注意,在前面的示例中,fixed-delayleveltimelogapplications 是 shell 补全提供的 “short-form” 属性名称。 这些 “short-form” 属性名称仅适用于公开的属性。在所有其他情况下,应仅使用完全限定的属性名称。spring-doc.cadn.net.cn

18.2.2. 通用应用程序属性

除了通过 DSL 进行配置之外, Spring Cloud Data Flow 还提供了一种机制,用于将公共属性设置为所有 它启动的流式处理应用程序。 这可以通过添加前缀为spring.cloud.dataflow.applicationProperties.stream启动时 服务器。 执行此作时,服务器会将所有属性(不带前缀)传递给它启动的实例。spring-doc.cadn.net.cn

例如,所有启动的应用程序都可以配置为使用特定的 Kafka 代理,方法是启动 数据流服务器具有以下选项:spring-doc.cadn.net.cn

--spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.kafka.binder.brokers=192.168.1.100:9092
--spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.kafka.binder.zkNodes=192.168.1.100:2181

这样做会导致spring.cloud.stream.kafka.binder.brokersspring.cloud.stream.kafka.binder.zkNodes性能 传递给所有启动的应用程序。spring-doc.cadn.net.cn

使用此机制配置的属性的优先级低于流部署属性。 如果在流部署时指定了具有相同键的属性(例如,app.http.spring.cloud.stream.kafka.binder.brokers覆盖 common 属性)。

18.3. 部署 Stream

本节介绍当 Spring Cloud Data Flow 服务器负责部署 Stream 时,如何部署 Stream。它涵盖了使用 Skipper 服务部署 Streams 和升级。有关如何设置部署属性的描述适用于 Stream 部署的两种方法。spring-doc.cadn.net.cn

考虑一下ticktock流定义:spring-doc.cadn.net.cn

dataflow:> stream create --definition "time | log" --name ticktock

要部署流,请使用以下 shell 命令:spring-doc.cadn.net.cn

dataflow:> stream deploy --name ticktock

数据流服务器将timelog应用。spring-doc.cadn.net.cn

stream info命令显示有关流的有用信息,包括部署属性:spring-doc.cadn.net.cn

dataflow:>stream info --name ticktock
╔═══════════╤═════════════════╤═════════╗
║Stream Name│Stream Definition│  Status ║
╠═══════════╪═════════════════╪═════════╣
║ticktock   │time | log       │deploying║
╚═══════════╧═════════════════╧═════════╝

Stream Deployment properties: {
  "log" : {
    "resource" : "maven://org.springframework.cloud.stream.app:log-sink-rabbit",
    "spring.cloud.deployer.group" : "ticktock",
    "version" : "2.0.1.RELEASE"
  },
  "time" : {
    "resource" : "maven://org.springframework.cloud.stream.app:time-source-rabbit",
    "spring.cloud.deployer.group" : "ticktock",
    "version" : "2.0.1.RELEASE"
  }
}

有一个重要的可选 command 参数(称为--platformName) 到stream deploy命令。 可以将 Skipper 配置为部署到多个平台。 Skipper 预先配置了一个名为default,它将应用程序部署到运行 Skipper 的本地计算机。 的--platformName命令行参数是default. 如果您通常部署到一个平台,那么在安装 Skipper 时,您可以覆盖default平台。 否则,请指定platformName设置为stream platform-list命令。spring-doc.cadn.net.cn

在前面的示例中,时间源每秒将当前时间作为消息发送一次,日志接收器使用日志框架输出该时间。 您可以跟踪stdoutlog 中(具有<instance>suffix) 的 Package。日志文件位于 Data Flow Server 的日志输出中显示的目录中,如下面的清单所示:spring-doc.cadn.net.cn

$ tail -f /var/folders/wn/8jxm_tbd1vj28c8vj37n900m0000gn/T/spring-cloud-dataflow-912434582726479179/ticktock-1464788481708/ticktock.log/stdout_0.log
2016-06-01 09:45:11.250  INFO 79194 --- [  kafka-binder-] log.sink    : 06/01/16 09:45:11
2016-06-01 09:45:12.250  INFO 79194 --- [  kafka-binder-] log.sink    : 06/01/16 09:45:12
2016-06-01 09:45:13.251  INFO 79194 --- [  kafka-binder-] log.sink    : 06/01/16 09:45:13

您还可以通过传递--deploy标志,如下所示:spring-doc.cadn.net.cn

dataflow:> stream create --definition "time | log" --name ticktock --deploy

但是,在实际使用案例中,一步创建和部署流并不常见。 原因是,当您使用stream deploy命令中,您可以传入定义如何将应用程序映射到平台的属性(例如,要使用的容器的内存大小、要运行的每个应用程序的数量以及是否启用数据分区功能)。 Properties 还可以覆盖在创建流时设置的应用程序属性。 以下部分将详细介绍此功能。spring-doc.cadn.net.cn

18.3.1. 部署属性

部署流时,您可以指定可以控制应用程序部署和配置方式的属性。有关更多信息,请参阅微型网站的 Deployment Properties 部分。spring-doc.cadn.net.cn

18.4. 销毁 Stream

您可以通过发出stream destroy命令,如下所示:spring-doc.cadn.net.cn

dataflow:> stream destroy --name ticktock

如果已部署流,则会在删除流定义之前取消部署该流。spring-doc.cadn.net.cn

18.5. 取消部署 Stream

通常,您希望停止流,但保留名称和定义以备将来使用。在这种情况下,您可以undeploy按名称划分的流:spring-doc.cadn.net.cn

dataflow:> stream undeploy --name ticktock
dataflow:> stream deploy --name ticktock

您可以发出deploy命令重新启动它:spring-doc.cadn.net.cn

dataflow:> stream deploy --name ticktock

18.6. 验证流

有时,流定义中包含的应用程序在其注册中包含无效的 URI。 这可能是由于在应用程序注册时输入了无效的 URI,或者从要从中提取 URI 的存储库中删除了应用程序。 要验证流中包含的所有应用程序都是可解析的,用户可以使用validate命令:spring-doc.cadn.net.cn

dataflow:>stream validate ticktock
╔═══════════╤═════════════════╗
║Stream Name│Stream Definition║
╠═══════════╪═════════════════╣
║ticktock   │time | log       ║
╚═══════════╧═════════════════╝


ticktock is a valid stream.
╔═══════════╤═════════════════╗
║ App Name  │Validation Status║
╠═══════════╪═════════════════╣
║source:time│valid            ║
║sink:log   │valid            ║
╚═══════════╧═════════════════╝

在前面的示例中,用户验证了他们的 ticktock 流。这source:timesink:log有效。 现在,我们可以看到如果我们有一个流定义,其中包含一个具有无效 URI 的已注册应用程序,会发生什么情况:spring-doc.cadn.net.cn

dataflow:>stream validate bad-ticktock
╔════════════╤═════════════════╗
║Stream Name │Stream Definition║
╠════════════╪═════════════════╣
║bad-ticktock│bad-time | log   ║
╚════════════╧═════════════════╝


bad-ticktock is an invalid stream.
╔═══════════════╤═════════════════╗
║   App Name    │Validation Status║
╠═══════════════╪═════════════════╣
║source:bad-time│invalid          ║
║sink:log       │valid            ║
╚═══════════════╧═════════════════╝

在这种情况下,Spring Cloud Data Flow 声明流无效,因为source:bad-time具有无效的 URI。spring-doc.cadn.net.cn

18.7. 更新流

要更新流,请使用stream update命令,该命令采用--properties--propertiesFile作为命令参数。 Skipper 有一个重要的新顶级前缀:version. 以下命令部署http | logstream(以及log在部署时注册的3.2.0):spring-doc.cadn.net.cn

dataflow:> stream create --name httptest --definition "http --server.port=9000 | log"
dataflow:> stream deploy --name httptest
dataflow:>stream info httptest
╔══════════════════════════════╤══════════════════════════════╤════════════════════════════╗
║             Name             │             DSL              │          Status            ║
╠══════════════════════════════╪══════════════════════════════╪════════════════════════════╣
║httptest                      │http --server.port=9000 | log │deploying                   ║
╚══════════════════════════════╧══════════════════════════════╧════════════════════════════╝

Stream Deployment properties: {
  "log" : {
    "spring.cloud.deployer.indexed" : "true",
    "spring.cloud.deployer.group" : "httptest",
    "maven://org.springframework.cloud.stream.app:log-sink-rabbit" : "3.2.0"
  },
  "http" : {
    "spring.cloud.deployer.group" : "httptest",
    "maven://org.springframework.cloud.stream.app:http-source-rabbit" : "3.2.0"
  }
}

然后,以下命令更新流以使用3.2.1log 应用程序的版本。 在使用应用程序的特定版本更新流之前,我们需要确保应用程序已注册到该版本:spring-doc.cadn.net.cn

dataflow:>app register --name log --type sink --uri maven://org.springframework.cloud.stream.app:log-sink-rabbit:3.2.1
Successfully registered application 'sink:log'

然后我们可以更新应用程序:spring-doc.cadn.net.cn

dataflow:>stream update --name httptest --properties version.log=3.2.1
您只能使用预先注册的应用程序版本来deploy,updaterollback一条溪流。

要验证部署属性和更新版本,我们可以使用stream info,如以下示例中所示(及其输出):spring-doc.cadn.net.cn

dataflow:>stream info httptest
╔══════════════════════════════╤══════════════════════════════╤════════════════════════════╗
║             Name             │             DSL              │          Status            ║
╠══════════════════════════════╪══════════════════════════════╪════════════════════════════╣
║httptest                      │http --server.port=9000 | log │deploying                   ║
╚══════════════════════════════╧══════════════════════════════╧════════════════════════════╝

Stream Deployment properties: {
  "log" : {
    "spring.cloud.deployer.indexed" : "true",
    "spring.cloud.deployer.count" : "1",
    "spring.cloud.deployer.group" : "httptest",
    "maven://org.springframework.cloud.stream.app:log-sink-rabbit" : "3.2.1"
  },
  "http" : {
    "spring.cloud.deployer.group" : "httptest",
    "maven://org.springframework.cloud.stream.app:http-source-rabbit" : "3.2.1"
  }
}

18.8. 强制更新 Stream

升级流时,您可以使用--force选项来部署当前部署的应用程序的新实例,即使应用程序或部署属性未发生更改。 当应用程序本身在启动时(例如,从 Spring Cloud Config Server)获取配置信息时,需要此行为。 您可以使用--app-names选择。 如果未指定任何应用程序名称,则所有应用程序都将强制升级。 您可以指定--force--app-namesoptions 和--properties--propertiesFile选项。spring-doc.cadn.net.cn

18.9. 流版本

Skipper 保留已部署流的历史记录。 更新 Stream 后,将有 Stream 的第二个版本。 您可以使用stream history --name <name-of-stream>命令:spring-doc.cadn.net.cn

dataflow:>stream history --name httptest
╔═══════╤════════════════════════════╤════════╤════════════╤═══════════════╤════════════════╗
║Version│        Last updated        │ Status │Package Name│Package Version│  Description   ║
╠═══════╪════════════════════════════╪════════╪════════════╪═══════════════╪════════════════╣
║2      │Mon Nov 27 22:41:16 EST 2017│DEPLOYED│httptest    │1.0.0          │Upgrade complete║
║1      │Mon Nov 27 22:40:41 EST 2017│DELETED │httptest    │1.0.0          │Delete complete ║
╚═══════╧════════════════════════════╧════════╧════════════╧═══════════════╧════════════════╝

18.10. 流清单

替换所有值后,Skipper 会保留所有应用程序、其应用程序属性及其部署属性的“清单”。 这表示部署到平台的内容的最终状态。 您可以使用以下命令查看 Stream 的任何版本的清单:spring-doc.cadn.net.cn

stream manifest --name <name-of-stream> --releaseVersion <optional-version>

如果--releaseVersion未指定,则返回最后一个版本的清单。spring-doc.cadn.net.cn

以下示例显示了清单的用法:spring-doc.cadn.net.cn

dataflow:>stream manifest --name httptest

使用该命令将产生以下输出:spring-doc.cadn.net.cn

# Source: log.yml
apiVersion: skipper.spring.io/v1
kind: SpringCloudDeployerApplication
metadata:
  name: log
spec:
  resource: maven://org.springframework.cloud.stream.app:log-sink-rabbit
  version: 3.2.0
  applicationProperties:
    spring.cloud.dataflow.stream.app.label: log
    spring.cloud.stream.bindings.input.group: httptest
    spring.cloud.dataflow.stream.name: httptest
    spring.cloud.dataflow.stream.app.type: sink
    spring.cloud.stream.bindings.input.destination: httptest.http
  deploymentProperties:
    spring.cloud.deployer.indexed: true
    spring.cloud.deployer.group: httptest
    spring.cloud.deployer.count: 1

---
# Source: http.yml
apiVersion: skipper.spring.io/v1
kind: SpringCloudDeployerApplication
metadata:
  name: http
spec:
  resource: maven://org.springframework.cloud.stream.app:http-source-rabbit
  version: 3.2.0
  applicationProperties:
    spring.cloud.dataflow.stream.app.label: http
    spring.cloud.stream.bindings.output.producer.requiredGroups: httptest
    server.port: 9000
    spring.cloud.stream.bindings.output.destination: httptest.http
    spring.cloud.dataflow.stream.name: httptest
    spring.cloud.dataflow.stream.app.type: source
  deploymentProperties:
    spring.cloud.deployer.group: httptest

大多数部署和应用程序属性由 Data Flow 设置,以使应用程序能够相互通信并发送带有标识标签的应用程序指标。spring-doc.cadn.net.cn

18.11. 回滚流

您可以使用stream rollback命令:spring-doc.cadn.net.cn

dataflow:>stream rollback --name httptest

可选的--releaseVersioncommand 参数添加流的版本。 如果未指定,则回滚作将转到以前的流版本。spring-doc.cadn.net.cn

18.12. 应用程序计数

应用程序计数是系统的动态属性,用于指定应用程序的实例数。有关更多信息,请参阅微型网站的 Application Count 部分。spring-doc.cadn.net.cn

18.13. Skipper 的升级策略

Skipper 有一个简单的 “红/黑” 升级策略。它使用与当前正在运行的版本一样多的实例来部署新版本的应用程序,并检查/health端点。 如果新应用程序的运行状况良好,则会取消部署以前的应用程序。 如果新应用程序的运行状况不佳,则会取消部署所有新应用程序,并且认为升级不成功。spring-doc.cadn.net.cn

升级策略不是滚动升级,因此,如果应用程序的 5 个实例正在运行,那么,在晴天的情况下,在取消部署旧版本之前,其中 5 个新应用程序也在运行。spring-doc.cadn.net.cn

19. 流 DSL

本部分介绍流 DSL 简介中未涵盖的流 DSL 的其他功能。spring-doc.cadn.net.cn

19.1. 点击流

可以在流中的各种创建者终端节点创建 Tap。有关更多信息,请参阅微型网站的 Tapping a Stream 部分。spring-doc.cadn.net.cn

19.2. 在 Stream 中使用 Labels

当流由多个同名应用程序组成时,必须使用标签对它们进行限定。 有关更多信息,请参阅微型网站的 Labeling Applications 部分。spring-doc.cadn.net.cn

19.3. 命名目的地

您可以使用命名目标,而不是引用源或接收器应用程序。 有关更多信息,请参阅微型网站的 Named Destinations 部分。spring-doc.cadn.net.cn

19.4. 扇入和扇出

通过使用命名目标,您可以支持扇入和扇出使用案例。 有关更多信息,请参阅微型网站的 Fan-in 和 Fan-out 部分。spring-doc.cadn.net.cn

20. 流式传输 Java DSL

您可以使用 Java 提供的基于 Java 的 DSL,而不是使用 shell 来创建和部署流spring-cloud-dataflow-rest-client模块。 有关更多信息,请参阅微型网站的 Java DSL 部分。spring-doc.cadn.net.cn

21. 使用多个 Binder 配置流式传输应用程序

在某些情况下,当需要连接到不同的消息传递时,流可以将其应用程序绑定到多个 Spring Cloud 流 Binders 中间件配置。在这些情况下,您应该确保使用其 Binder 正确配置应用程序 配置。例如,支持 Kafka 和 Rabbit Binders 的多 Binder 转换器是以中的处理器:spring-doc.cadn.net.cn

http | multibindertransform --expression=payload.toUpperCase() | log
在前面的示例中,您将编写自己的multibindertransform应用。

在此流中,每个应用程序都按以下方式连接到消息中间件:spring-doc.cadn.net.cn

  1. HTTP 源将事件发送到 RabbitMQ (rabbit1).spring-doc.cadn.net.cn

  2. Multi-Binder 转换处理器从 RabbitMQ (rabbit1),并将处理后的事件发送到 Kafka (kafka1).spring-doc.cadn.net.cn

  3. 日志接收器从 Kafka (kafka1).spring-doc.cadn.net.cn

这里rabbit1kafka1是 Spring Cloud Stream 应用程序属性中给出的 Binder 名称。 基于此设置,应用程序的 Classpaths 中具有以下 Binders 并具有适当的配置:spring-doc.cadn.net.cn

spring-cloud-stream binder可以在应用程序本身中设置配置属性。 如果没有,则可以传递deployment属性:spring-doc.cadn.net.cn

dataflow:>stream create --definition "http | multibindertransform --expression=payload.toUpperCase() | log" --name mystream

dataflow:>stream deploy mystream --properties "app.http.spring.cloud.stream.bindings.output.binder=rabbit1,app.multibindertransform.spring.cloud.stream.bindings.input.binder=rabbit1,
app.multibindertransform.spring.cloud.stream.bindings.output.binder=kafka1,app.log.spring.cloud.stream.bindings.input.binder=kafka1"

您可以通过部署属性指定任何 Binder 配置属性来覆盖这些属性。spring-doc.cadn.net.cn

22. 功能组成

函数组合允许您将函数逻辑动态地附加到现有事件流应用程序。有关更多详细信息,请参阅微型网站的函数组成部分。spring-doc.cadn.net.cn

23. 功能应用

使用 Spring Cloud Stream 3.x 添加功能支持,您可以构建Source,SinkProcessor应用程序,只需实现 Java Util 的Supplier,ConsumerFunction接口。 有关此功能的更多信息,请参阅 SCDF 站点的 Functional Application Recipespring-doc.cadn.net.cn

24. 示例

本章包括以下示例:spring-doc.cadn.net.cn

24.1. 简单的流处理

作为简单处理步骤的示例,我们可以使用以定义将 HTTP 发布的数据的有效负载转换为大写:spring-doc.cadn.net.cn

http | transform --expression=payload.toUpperCase() | log

要创建此流,请在 shell 中输入以下命令:spring-doc.cadn.net.cn

dataflow:> stream create --definition "http --server.port=9000 | transform --expression=payload.toUpperCase() | log" --name mystream --deploy

以下示例使用 shell 命令发布一些数据:spring-doc.cadn.net.cn

dataflow:> http post --target http://localhost:9000 --data "hello"

前面的示例导致大写HELLO在日志中,如下所示:spring-doc.cadn.net.cn

2016-06-01 09:54:37.749  INFO 80083 --- [  kafka-binder-] log.sink    : HELLO

24.2. 有状态流处理

为了演示数据分区功能,下面的清单部署了一个以 Kafka 作为 Binder 的流:spring-doc.cadn.net.cn

dataflow:>stream create --name words --definition "http --server.port=9900 | splitter --expression=payload.split(' ') | log"
Created new stream 'words'

dataflow:>stream deploy words --properties "app.splitter.producer.partitionKeyExpression=payload,deployer.log.count=2"
Deployed stream 'words'

dataflow:>http post --target http://localhost:9900 --data "How much wood would a woodchuck chuck if a woodchuck could chuck wood"
> POST (text/plain;Charset=UTF-8) http://localhost:9900 How much wood would a woodchuck chuck if a woodchuck could chuck wood
> 202 ACCEPTED


dataflow:>runtime apps
╔════════════════════╤═══════════╤═══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╗
║App Id / Instance Id│Unit Status│                                                               No. of Instances / Attributes                                                               ║
╠════════════════════╪═══════════╪═══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╣
║words.log-v1        │ deployed  │                                                                             2                                                                             ║
╟┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈╢
║                    │           │       guid = 24166                                                                                                                                        ║
║                    │           │        pid = 33097                                                                                                                                        ║
║                    │           │       port = 24166                                                                                                                                        ║
║words.log-v1-0      │ deployed  │     stderr = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461063/words.log-v1/stderr_0.log     ║
║                    │           │     stdout = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461063/words.log-v1/stdout_0.log     ║
║                    │           │        url = https://192.168.0.102:24166                                                                                                                   ║
║                    │           │working.dir = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461063/words.log-v1                  ║
╟┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈╢
║                    │           │       guid = 41269                                                                                                                                        ║
║                    │           │        pid = 33098                                                                                                                                        ║
║                    │           │       port = 41269                                                                                                                                        ║
║words.log-v1-1      │ deployed  │     stderr = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461063/words.log-v1/stderr_1.log     ║
║                    │           │     stdout = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461063/words.log-v1/stdout_1.log     ║
║                    │           │        url = https://192.168.0.102:41269                                                                                                                   ║
║                    │           │working.dir = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461063/words.log-v1                  ║
╟────────────────────┼───────────┼───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╢
║words.http-v1       │ deployed  │                                                                             1                                                                             ║
╟┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈╢
║                    │           │       guid = 9900                                                                                                                                         ║
║                    │           │        pid = 33094                                                                                                                                        ║
║                    │           │       port = 9900                                                                                                                                         ║
║words.http-v1-0     │ deployed  │     stderr = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461054/words.http-v1/stderr_0.log    ║
║                    │           │     stdout = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461054/words.http-v1/stdout_0.log    ║
║                    │           │        url = https://192.168.0.102:9900                                                                                                                    ║
║                    │           │working.dir = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461054/words.http-v1                 ║
╟────────────────────┼───────────┼───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╢
║words.splitter-v1   │ deployed  │                                                                             1                                                                             ║
╟┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈╢
║                    │           │       guid = 33963                                                                                                                                        ║
║                    │           │        pid = 33093                                                                                                                                        ║
║                    │           │       port = 33963                                                                                                                                        ║
║words.splitter-v1-0 │ deployed  │     stderr = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803437542/words.splitter-v1/stderr_0.log║
║                    │           │     stdout = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803437542/words.splitter-v1/stdout_0.log║
║                    │           │        url = https://192.168.0.102:33963                                                                                                                   ║
║                    │           │working.dir = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803437542/words.splitter-v1             ║
╚════════════════════╧═══════════╧═══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╝

当您查看words.log-v1-0logs,您应该会看到以下内容:spring-doc.cadn.net.cn

2016-06-05 18:35:47.047  INFO 58638 --- [  kafka-binder-] log.sink                                 : How
2016-06-05 18:35:47.066  INFO 58638 --- [  kafka-binder-] log.sink                                 : chuck
2016-06-05 18:35:47.066  INFO 58638 --- [  kafka-binder-] log.sink                                 : chuck

当您查看words.log-v1-1logs,您应该会看到以下内容:spring-doc.cadn.net.cn

2016-06-05 18:35:47.047  INFO 58639 --- [  kafka-binder-] log.sink                                 : much
2016-06-05 18:35:47.066  INFO 58639 --- [  kafka-binder-] log.sink                                 : wood
2016-06-05 18:35:47.066  INFO 58639 --- [  kafka-binder-] log.sink                                 : would
2016-06-05 18:35:47.066  INFO 58639 --- [  kafka-binder-] log.sink                                 : a
2016-06-05 18:35:47.066  INFO 58639 --- [  kafka-binder-] log.sink                                 : woodchuck
2016-06-05 18:35:47.067  INFO 58639 --- [  kafka-binder-] log.sink                                 : if
2016-06-05 18:35:47.067  INFO 58639 --- [  kafka-binder-] log.sink                                 : a
2016-06-05 18:35:47.067  INFO 58639 --- [  kafka-binder-] log.sink                                 : woodchuck
2016-06-05 18:35:47.067  INFO 58639 --- [  kafka-binder-] log.sink                                 : could
2016-06-05 18:35:47.067  INFO 58639 --- [  kafka-binder-] log.sink                                 : wood

此示例显示,包含相同单词的有效负载拆分将路由到同一应用程序实例。spring-doc.cadn.net.cn

24.3. 其他 source 和 sink 应用程序类型

此示例显示了一些更复杂的内容:将timesource for something else 的。另一种受支持的源类型是http,它接受通过 HTTP POST 请求摄取的数据。请注意,httpsource 接受与数据流服务器不同的端口(默认为 8080)上的数据。默认情况下,端口是随机分配的。spring-doc.cadn.net.cn

要创建使用httpsource 的 intent 语句,但仍然使用相同的logsink 中,我们会将 Simple Stream Processing 示例中的原始命令更改为以下内容:spring-doc.cadn.net.cn

dataflow:> stream create --definition "http | log" --name myhttpstream --deploy

请注意,这一次,在我们实际发布一些数据(通过使用 shell 命令)之前,我们不会看到任何其他输出。要查看随机分配的端口,其中httpsource 正在侦听,请运行以下命令:spring-doc.cadn.net.cn

dataflow:>runtime apps

╔══════════════════════╤═══════════╤═════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╗
║ App Id / Instance Id │Unit Status│                                                                    No. of Instances / Attributes                                                                    ║
╠══════════════════════╪═══════════╪═════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╣
║myhttpstream.log-v1   │ deploying │                                                                                  1                                                                                  ║
╟┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈╢
║                      │           │       guid = 39628                                                                                                                                                  ║
║                      │           │        pid = 34403                                                                                                                                                  ║
║                      │           │       port = 39628                                                                                                                                                  ║
║myhttpstream.log-v1-0 │ deploying │     stderr = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/myhttpstream-1542803867070/myhttpstream.log-v1/stderr_0.log ║
║                      │           │     stdout = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/myhttpstream-1542803867070/myhttpstream.log-v1/stdout_0.log ║
║                      │           │        url = https://192.168.0.102:39628                                                                                                                             ║
║                      │           │working.dir = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/myhttpstream-1542803867070/myhttpstream.log-v1              ║
╟──────────────────────┼───────────┼─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╢
║myhttpstream.http-v1  │ deploying │                                                                                  1                                                                                  ║
╟┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈╢
║                      │           │       guid = 52143                                                                                                                                                  ║
║                      │           │        pid = 34401                                                                                                                                                  ║
║                      │           │       port = 52143                                                                                                                                                  ║
║myhttpstream.http-v1-0│ deploying │     stderr = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/myhttpstream-1542803866800/myhttpstream.http-v1/stderr_0.log║
║                      │           │     stdout = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/myhttpstream-1542803866800/myhttpstream.http-v1/stdout_0.log║
║                      │           │        url = https://192.168.0.102:52143                                                                                                                             ║
║                      │           │working.dir = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/myhttpstream-1542803866800/myhttpstream.http-v1             ║
╚══════════════════════╧═══════════╧═════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╝

您应该会看到相应的httpsource 的url属性,其中包含它正在侦听的主机和端口信息。现在,您可以发布到该 URL,如以下示例所示:spring-doc.cadn.net.cn

dataflow:> http post --target http://localhost:1234 --data "hello"
dataflow:> http post --target http://localhost:1234 --data "goodbye"

然后,流将来自httpsource 添加到由logsink 的 URL 中,生成类似于以下内容的输出:spring-doc.cadn.net.cn

2016-06-01 09:50:22.121  INFO 79654 --- [  kafka-binder-] log.sink    : hello
2016-06-01 09:50:26.810  INFO 79654 --- [  kafka-binder-] log.sink    : goodbye

我们还可以更改 sink 实现。您可以通过管道将输出传递给文件 (file)、到 Hadoop (hdfs) 或任何其他可用的 sink 应用程序。您还可以定义自己的应用程序。spring-doc.cadn.net.cn