Streams

This section goes into more detail about how you can create Streams, which are collections of Spring Cloud Stream applications. It covers topics such as creating and deploying Streams.spring-doc.cn

If you are just starting out with Spring Cloud Data Flow, you should probably read the Getting Started guide before diving into this section.spring-doc.cn

17. Introduction

A Stream is a collection of long-lived Spring Cloud Stream applications that communicate with each other over messaging middleware. A text-based DSL defines the configuration and data flow between the applications. While many applications are provided for you to implement common use-cases, you typically create a custom Spring Cloud Stream application to implement custom business logic.spring-doc.cn

The general lifecycle of a Stream is:spring-doc.cn

  1. Register applications.spring-doc.cn

  2. Create a Stream Definition.spring-doc.cn

  3. Deploy the Stream.spring-doc.cn

  4. Undeploy or destroy the Stream.spring-doc.cn

  5. Upgrade or roll back applications in the Stream.spring-doc.cn

For deploying Streams, the Data Flow Server has to be configured to delegate the deployment to a new server in the Spring Cloud ecosystem named Skipper.spring-doc.cn

Furthermore, you can configure Skipper to deploy applications to one or more Cloud Foundry orgs and spaces, one or more namespaces on a Kubernetes cluster, or to the local machine. When deploying a stream in Data Flow, you can specify which platform to use at deployment time. Skipper also provides Data Flow with the ability to perform updates to deployed streams. There are many ways the applications in a stream can be updated, but one of the most common examples is to upgrade a processor application with new custom business logic while leaving the existing source and sink applications alone.spring-doc.cn

17.1. Stream Pipeline DSL

A stream is defined by using a Unix-inspired Pipeline syntax. The syntax uses vertical bars, known as “pipes”, to connect multiple commands. The command ls -l | grep key | less in Unix takes the output of the ls -l process and pipes it to the input of the grep key process. The output of grep is, in turn, sent to the input of the less process. Each | symbol connects the standard output of the command on the left to the standard input of the command on the right. Data flows through the pipeline from left to right.spring-doc.cn

In Data Flow, the Unix command is replaced by a Spring Cloud Stream application and each pipe symbol represents connecting the input and output of applications over messaging middleware, such as RabbitMQ or Apache Kafka.spring-doc.cn

Each Spring Cloud Stream application is registered under a simple name. The registration process specifies where the application can be obtained (for example, in a Maven Repository or a Docker registry). You can find out more about how to register Spring Cloud Stream applications in this section. In Data Flow, we classify the Spring Cloud Stream applications as Sources, Processors, or Sinks.spring-doc.cn

As a simple example, consider the collection of data from an HTTP Source and writing to a File Sink. Using the DSL, the stream description is:spring-doc.cn

http | filespring-doc.cn

A stream that involves some processing would be expressed as:spring-doc.cn

http | filter | transform | filespring-doc.cn

Stream definitions can be created by using the shell’s stream create command, as shown in the following example:spring-doc.cn

dataflow:> stream create --name httpIngest --definition "http | file"spring-doc.cn

The Stream DSL is passed in to the --definition command option.spring-doc.cn

The deployment of stream definitions is done through the Shell’s stream deploy command, as follows:spring-doc.cn

dataflow:> stream deploy --name ticktockspring-doc.cn

The Getting Started section shows you how to start the server and how to start and use the Spring Cloud Data Flow shell.spring-doc.cn

Note that the shell calls the Data Flow Server’s REST API. For more information on making HTTP requests directly to the server, see the REST API Guide.spring-doc.cn

When naming a stream definition, keep in mind that each application in the stream will be created on the platform with the name in the format of <stream name>-<app name>. Thus, the total length of the generated application name can’t exceed 58 characters.

17.2. Stream Application DSL

You can use the Stream Application DSL to define custom binding properties for each of the Spring Cloud Stream applications. See the Stream Application DSL section of the microsite for more information.spring-doc.cn

Consider the following Java interface, which defines an input method and two output methods:spring-doc.cn

public interface Barista {

    @Input
    SubscribableChannel orders();

    @Output
    MessageChannel hotDrinks();

    @Output
    MessageChannel coldDrinks();
}

Further consider the following Java interface, which is typical for creating a Kafka Streams application:spring-doc.cn

interface KStreamKTableBinding {

    @Input
    KStream<?, ?> inputStream();

    @Input
    KTable<?, ?> inputTable();
}

In these cases with multiple input and output bindings, Data Flow cannot make any assumptions about the flow of data from one application to another. Therefore, you need to set the binding properties to “wire up” the application. The Stream Application DSL uses a “double pipe”, instead of the “pipe symbol”, to indicate that Data Flow should not configure the binding properties of the application. Think of || as meaning “in parallel”. The following example shows such a “parallel” definition:spring-doc.cn

dataflow:> stream create --definition "orderGeneratorApp || baristaApp || hotDrinkDeliveryApp || coldDrinkDeliveryApp" --name myCafeStream
Breaking Change! Versions of SCDF Local, Cloud Foundry 1.7.0 to 1.7.2 and SCDF Kubernetes 1.7.0 to 1.7.1 used the comma character as the separator between applications. This caused breaking changes in the traditional Stream DSL. While not ideal, changing the separator character was felt to be the best solution with the least impact on existing users.

This stream has four applications. baristaApp has two output destinations, hotDrinks and coldDrinks, intended to be consumed by the hotDrinkDeliveryApp and coldDrinkDeliveryApp, respectively. When deploying this stream, you need to set the binding properties so that the baristaApp sends hot drink messages to the hotDrinkDeliveryApp destination and cold drink messages to the coldDrinkDeliveryApp destination. The following listing does so:spring-doc.cn

app.baristaApp.spring.cloud.stream.bindings.hotDrinks.destination=hotDrinksDest
app.baristaApp.spring.cloud.stream.bindings.coldDrinks.destination=coldDrinksDest
app.hotDrinkDeliveryApp.spring.cloud.stream.bindings.input.destination=hotDrinksDest
app.coldDrinkDeliveryApp.spring.cloud.stream.bindings.input.destination=coldDrinksDest

If you want to use consumer groups, you need to set the Spring Cloud Stream application properties, spring.cloud.stream.bindings.<channelName>.producer.requiredGroups and spring.cloud.stream.bindings.<channelName>.group, on the producer and consumer applications respectively.spring-doc.cn

Another common use case for the Stream Application DSL is to deploy a HTTP gateway application that sends a synchronous request or reply message to a Kafka or RabbitMQ application. In this case, both the HTTP gateway application and the Kafka or RabbitMQ application can be a Spring Integration application that does not make use of the Spring Cloud Stream library.spring-doc.cn

It is also possible to deploy only a single application using the Stream application DSL.spring-doc.cn

17.3. Application Properties

Each application takes properties to customize its behavior. As an example, the http source module exposes a port setting that lets the data ingestion port be changed from the default value:spring-doc.cn

dataflow:> stream create --definition "http --port=8090 | log" --name myhttpstream

This port property is actually the same as the standard Spring Boot server.port property. Data Flow adds the ability to use the shorthand form port instead of server.port. You can also specify the longhand version:spring-doc.cn

dataflow:> stream create --definition "http --server.port=8000 | log" --name myhttpstream

This shorthand behavior is discussed more in the section on Stream Application Properties. If you have registered application property metadata, you can use tab completion in the shell after typing -- to get a list of candidate property names.spring-doc.cn

The shell provides tab completion for application properties. The app info --name <appName> --type <appType> shell command provides additional documentation for all the supported properties.spring-doc.cn

Supported Stream <appType> possibilities are: source, processor, and sink.

18. Stream Lifecycle

The lifecycle of a stream goes through the following stages:spring-doc.cn

Skipper is a server that lets you discover Spring Boot applications and manage their lifecycle on multiple Cloud Platforms.spring-doc.cn

Applications in Skipper are bundled as packages that contain the application’s resource location, application properties, and deployment properties. You can think of Skipper packages as being analogous to packages found in tools such as apt-get or brew.spring-doc.cn

When Data Flow deploys a Stream, it generates and upload a package to Skipper that represents the applications in the Stream. Subsequent commands to upgrade or roll back the applications within the Stream are passed through to Skipper. In addition, the Stream definition is reverse-engineered from the package, and the status of the Stream is also delegated to Skipper.spring-doc.cn

18.1. Register a Stream Application

You can register a versioned stream application by using the app register command. You must provide a unique name, an application type, and a URI that can be resolved to the application artifact. For the type, specify source, processor, or sink. The version is resolved from the URI. Here are a few examples:spring-doc.cn

dataflow:>app register --name mysource --type source --uri maven://com.example:mysource:0.0.1
dataflow:>app register --name mysource --type source --uri maven://com.example:mysource:0.0.2
dataflow:>app register --name mysource --type source --uri maven://com.example:mysource:0.0.3

dataflow:>app list --id source:mysource
╔═══╤══════════════════╤═════════╤════╤════╗
║app│      source      │processor│sink│task║
╠═══╪══════════════════╪═════════╪════╪════╣
║   │> mysource-0.0.1 <│         │    │    ║
║   │mysource-0.0.2    │         │    │    ║
║   │mysource-0.0.3    │         │    │    ║
╚═══╧══════════════════╧═════════╧════╧════╝

dataflow:>app register --name myprocessor --type processor --uri file:///Users/example/myprocessor-1.2.3.jar

dataflow:>app register --name mysink --type sink --uri https://example.com/mysink-2.0.1.jar

The application URI should conform to one the following schema formats:spring-doc.cn

  • Maven schema:spring-doc.cn

    maven://<groupId>:<artifactId>[:<extension>[:<classifier>]]:<version>
  • HTTP schema:spring-doc.cn

    http://<web-path>/<artifactName>-<version>.jar
  • File schema:spring-doc.cn

    file:///<local-path>/<artifactName>-<version>.jar
  • Docker schema:spring-doc.cn

    docker:<docker-image-path>/<imageName>:<version>
The URI <version> part is compulsory for versioned stream applications. Skipper uses the multi-versioned stream applications to allow upgrading or rolling back those applications at runtime by using the deployment properties.

If you would like to register the snapshot versions of the http and log applications built with the RabbitMQ binder, you could do the following:spring-doc.cn

dataflow:>app register --name http --type source --uri maven://org.springframework.cloud.stream.app:http-source-rabbit:1.2.1.BUILD-SNAPSHOT
dataflow:>app register --name log --type sink --uri maven://org.springframework.cloud.stream.app:log-sink-rabbit:1.2.1.BUILD-SNAPSHOT

If you would like to register multiple applications at one time, you can store them in a properties file, where the keys are formatted as <type>.<name> and the values are the URIs.spring-doc.cn

For example, to register the snapshot versions of the http and log applications built with the RabbitMQ binder, you could have the following in a properties file (for example, stream-apps.properties):spring-doc.cn

source.http=maven://org.springframework.cloud.stream.app:http-source-rabbit:1.2.1.BUILD-SNAPSHOT
sink.log=maven://org.springframework.cloud.stream.app:log-sink-rabbit:1.2.1.BUILD-SNAPSHOT

Then, to import the applications in bulk, use the app import command and provide the location of the properties file with the --uri switch, as follows:spring-doc.cn

dataflow:>app import --uri file:///<YOUR_FILE_LOCATION>/stream-apps.properties

Registering an application by using --type app is the same as registering a source, processor or sink. Applications of the type app can be used only in the Stream Application DSL (which uses double pipes || instead of single pipes | in the DSL) and instructs Data Flow not to configure the Spring Cloud Stream binding properties of the application. The application that is registered using --type app does not have to be a Spring Cloud Stream application. It can be any Spring Boot application. See the Stream Application DSL introduction for more about using this application type.spring-doc.cn

You can register multiple versions of the same applications (for example, the same name and type), but you can set only one as the default. The default version is used for deploying Streams.spring-doc.cn

The first time an application is registered, it is marked as default. The default application version can be altered with the app default command:spring-doc.cn

dataflow:>app default --id source:mysource --version 0.0.2
dataflow:>app list --id source:mysource
╔═══╤══════════════════╤═════════╤════╤════╗
║app│      source      │processor│sink│task║
╠═══╪══════════════════╪═════════╪════╪════╣
║   │mysource-0.0.1    │         │    │    ║
║   │> mysource-0.0.2 <│         │    │    ║
║   │mysource-0.0.3    │         │    │    ║
╚═══╧══════════════════╧═════════╧════╧════╝

The app list --id <type:name> command lists all versions for a given stream application.spring-doc.cn

The app unregister command has an optional --version parameter to specify the application version to unregister:spring-doc.cn

dataflow:>app unregister --name mysource --type source --version 0.0.1
dataflow:>app list --id source:mysource
╔═══╤══════════════════╤═════════╤════╤════╗
║app│      source      │processor│sink│task║
╠═══╪══════════════════╪═════════╪════╪════╣
║   │> mysource-0.0.2 <│         │    │    ║
║   │mysource-0.0.3    │         │    │    ║
╚═══╧══════════════════╧═════════╧════╧════╝

If --version is not specified, the default version is unregistered.spring-doc.cn

All applications in a stream should have a default version set for the stream to be deployed. Otherwise, they are treated as unregistered application during the deployment. Use the app default command to set the defaults.spring-doc.cn

app default --id source:mysource --version 0.0.3
dataflow:>app list --id source:mysource
╔═══╤══════════════════╤═════════╤════╤════╗
║app│      source      │processor│sink│task║
╠═══╪══════════════════╪═════════╪════╪════╣
║   │mysource-0.0.2    │         │    │    ║
║   │> mysource-0.0.3 <│         │    │    ║
╚═══╧══════════════════╧═════════╧════╧════╝

The stream deploy necessitates default application versions being set. The stream update and stream rollback commands, though, can use all (default and non-default) registered application versions.spring-doc.cn

The following command creates a stream that uses the default mysource version (0.0.3):spring-doc.cn

dataflow:>stream create foo --definition "mysource | log"

Then we can update the version to 0.0.2:spring-doc.cn

dataflow:>stream update foo --properties version.mysource=0.0.2
Only pre-registered applications can be used to deploy, update, or rollback a Stream.

An attempt to update the mysource to version 0.0.1 (not registered) fails.spring-doc.cn

18.1.1. Register Supported Applications and Tasks

For convenience, we have the static files with application-URIs (for both Maven and Docker) available for all the out-of-the-box stream and task or batch app-starters. You can point to this file and import all the application-URIs in bulk. Otherwise, as explained previously, you can register them individually or have your own custom property file with only the required application-URIs in it. We recommend, however, having a “focused” list of desired application-URIs in a custom property file.spring-doc.cn

Spring Cloud Stream App Starters

The following table includes the dataflow.spring.io links to the available Stream Application Starters based on Spring Cloud Stream 2.1.x and Spring Boot 2.1.x:spring-doc.cn

Artifact Type Stable Release SNAPSHOT Release

RabbitMQ + Mavenspring-doc.cn

dataflow.spring.io/rabbitmq-maven-latestspring-doc.cn

dataflow.spring.io/Einstein-BUILD-SNAPSHOT-stream-applications-rabbit-mavenspring-doc.cn

RabbitMQ + Dockerspring-doc.cn

dataflow.spring.io/rabbitmq-docker-latestspring-doc.cn

dataflow.spring.io/Einstein-BUILD-SNAPSHOT-stream-applications-rabbit-dockerspring-doc.cn

Apache Kafka + Mavenspring-doc.cn

dataflow.spring.io/kafka-maven-latestspring-doc.cn

dataflow.spring.io/Einstein-BUILD-SNAPSHOT-stream-applications-kafka-mavenspring-doc.cn

Apache Kafka + Dockerspring-doc.cn

dataflow.spring.io/kafka-docker-latestspring-doc.cn

dataflow.spring.io/Einstein-BUILD-SNAPSHOT-stream-applications-kafka-dockerspring-doc.cn

By default, App Starter actuator endpoints are secured. You can disable security by deploying streams with the app.*.spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration property. On Kubernetes, see the Liveness and readiness probes section for how to configure security for actuator endpoints.
Starting with the Spring Cloud Stream 2.1 GA release, we now have robust interoperability with the Spring Cloud Function programming model. Building on that, with the Einstein release-train, it is now possible to pick a few Stream App Starters and compose them into a single application by using the functional-style programming model. Check out the "Composed Function Support in Spring Cloud Data Flow" blog to learn more about the developer and orchestration-experience with an example.
Spring Cloud Task App Starters

The following table includes the available Task Application Starters based on Spring Cloud Task 2.1.x and Spring Boot 2.1.x:spring-doc.cn

Artifact Type Stable Release SNAPSHOT Release

Mavenspring-doc.cn

dataflow.spring.io/task-maven-latestspring-doc.cn

dataflow.spring.io/Elston-BUILD-SNAPSHOT-task-applications-mavenspring-doc.cn

Dockerspring-doc.cn

dataflow.spring.io/task-docker-latestspring-doc.cn

dataflow.spring.io/Elston-BUILD-SNAPSHOT-task-applications-dockerspring-doc.cn

You can find more information about the available task starters in the Task App Starters Project Page and related reference documentation. For more information about the available stream starters, look at the Stream App Starters Project Page and related reference documentation.spring-doc.cn

As an example, if you would like to register all out-of-the-box stream applications built with the Kafka binder in bulk, you can use the following command:spring-doc.cn

$ dataflow:>app import --uri https://dataflow.spring.io/kafka-maven-latest

Alternatively, you can register all the stream applications with the Rabbit binder, as follows:spring-doc.cn

$ dataflow:>app import --uri https://dataflow.spring.io/rabbitmq-maven-latest

You can also pass the --local option (which is true by default) to indicate whether the properties file location should be resolved within the shell process itself. If the location should be resolved from the Data Flow Server process, specify --local false.spring-doc.cn

When you use either app register or app import, if an application is already registered with the provided name and type and version, it is, by default, not overridden. If you would like to override the pre-existing application uri or metadata-uri coordinates, include the --force option.spring-doc.cn

Note, however, that, once downloaded, applications may be cached locally on the Data Flow server, based on the resource location. If the resource location does not change (even though the actual resource bytes may be different), it is not re-downloaded. When using maven:// resources, on the other hand, using a constant location may still circumvent caching (if using -SNAPSHOT versions).spring-doc.cn

Moreover, if a stream is already deployed and uses some version of a registered app, then (forcibly) re-registering a different application has no effect until the stream is deployed again.spring-doc.cn

In some cases, the resource is resolved on the server side. In others, the URI is passed to a runtime container instance, where it is resolved. See the specific documentation of each Data Flow Server for more detail.

18.1.2. Creating Custom Applications

While Data Flow includes source, processor, sink applications, you can extend these applications or write a custom Spring Cloud Stream application.spring-doc.cn

The process of creating Spring Cloud Stream applications with Spring Initializr is detailed in the Spring Cloud Stream documentation. You can include multiple binders to an application. If you do so, see the instructions in [passing_producer_consumer_properties] for how to configure them.spring-doc.cn

To support allowing properties, Spring Cloud Stream applications running in Spring Cloud Data Flow can include the Spring Boot configuration-processor as an optional dependency, as shown in the following example:spring-doc.cn

<dependencies>
  <!-- other dependencies -->
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-configuration-processor</artifactId>
    <optional>true</optional>
  </dependency>
</dependencies>

NOTE:Make sure that the spring-boot-maven-plugin is included in the POM. The plugin is necessary for creating the executable jar that is registered with Spring Cloud Data Flow. Spring Initialzr includes the plugin in the generated POM.spring-doc.cn

Once you have created a custom application, you can register it, as described in Register a Stream Application.spring-doc.cn

18.2. Creating a Stream

The Spring Cloud Data Flow Server exposes a full RESTful API for managing the lifecycle of stream definitions, but the easiest way to use is it is through the Spring Cloud Data Flow shell. The Getting Started section describes how to start the shell.spring-doc.cn

New streams are created with the help of stream definitions. The definitions are built from a simple DSL. For example, consider what happens if we run the following shell command:spring-doc.cn

dataflow:> stream create --definition "time | log" --name ticktock

This defines a stream named ticktock that is based off of the DSL expression time | log. The DSL uses the “pipe” symbol (|), to connect a source to a sink.spring-doc.cn

The stream info command shows useful information about the stream, as shown (with its output) in the following example:spring-doc.cn

dataflow:>stream info ticktock
╔═══════════╤═════════════════╤══════════╗
║Stream Name│Stream Definition│  Status  ║
╠═══════════╪═════════════════╪══════════╣
║ticktock   │time | log       │undeployed║
╚═══════════╧═════════════════╧══════════╝

18.2.1. Stream Application Properties

Application properties are the properties associated with each application in the stream. When the application is deployed, the application properties are applied to the application through command-line arguments or environment variables, depending on the underlying deployment implementation.spring-doc.cn

The following stream can have application properties defined at the time of stream creation:spring-doc.cn

dataflow:> stream create --definition "time | log" --name ticktock

The app info --name <appName> --type <appType> shell command displays the exposed application properties for the application. For more about exposed properties, see Application Metadata.spring-doc.cn

The following listing shows the exposed properties for the time application:spring-doc.cn

dataflow:> app info --name time --type source
╔══════════════════════════════╤══════════════════════════════╤══════════════════════════════╤══════════════════════════════╗
║         Option Name          │         Description          │           Default            │             Type             ║
╠══════════════════════════════╪══════════════════════════════╪══════════════════════════════╪══════════════════════════════╣
║trigger.time-unit             │The TimeUnit to apply to delay│<none>                        │java.util.concurrent.TimeUnit ║
║                              │values.                       │                              │                              ║
║trigger.fixed-delay           │Fixed delay for periodic      │1                             │java.lang.Integer             ║
║                              │triggers.                     │                              │                              ║
║trigger.cron                  │Cron expression value for the │<none>                        │java.lang.String              ║
║                              │Cron Trigger.                 │                              │                              ║
║trigger.initial-delay         │Initial delay for periodic    │0                             │java.lang.Integer             ║
║                              │triggers.                     │                              │                              ║
║trigger.max-messages          │Maximum messages per poll, -1 │1                             │java.lang.Long                ║
║                              │means infinity.               │                              │                              ║
║trigger.date-format           │Format for the date value.    │<none>                        │java.lang.String              ║
╚══════════════════════════════╧══════════════════════════════╧══════════════════════════════╧══════════════════════════════╝

The following listing shows the exposed properties for the log application:spring-doc.cn

dataflow:> app info --name log --type sink
╔══════════════════════════════╤══════════════════════════════╤══════════════════════════════╤══════════════════════════════╗
║         Option Name          │         Description          │           Default            │             Type             ║
╠══════════════════════════════╪══════════════════════════════╪══════════════════════════════╪══════════════════════════════╣
║log.name                      │The name of the logger to use.│<none>                        │java.lang.String              ║
║log.level                     │The level at which to log     │<none>                        │org.springframework.integratio║
║                              │messages.                     │                              │n.handler.LoggingHandler$Level║
║log.expression                │A SpEL expression (against the│payload                       │java.lang.String              ║
║                              │incoming message) to evaluate │                              │                              ║
║                              │as the logged message.        │                              │                              ║
╚══════════════════════════════╧══════════════════════════════╧══════════════════════════════╧══════════════════════════════╝

You can specify the application properties for the time and log apps at the time of stream creation, as follows:spring-doc.cn

dataflow:> stream create --definition "time --fixed-delay=5 | log --level=WARN" --name ticktock

Note that, in the preceding example, the fixed-delay and level properties defined for the time and log applications are the “short-form” property names provided by the shell completion. These “short-form” property names are applicable only for the exposed properties. In all other cases, you should use only fully qualified property names.spring-doc.cn

18.2.2. Common Application Properties

In addition to configuration through DSL, Spring Cloud Data Flow provides a mechanism for setting common properties to all the streaming applications that are launched by it. This can be done by adding properties prefixed with spring.cloud.dataflow.applicationProperties.stream when starting the server. When doing so, the server passes all the properties, without the prefix, to the instances it launches.spring-doc.cn

For example, all the launched applications can be configured to use a specific Kafka broker by launching the Data Flow server with the following options:spring-doc.cn

--spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.kafka.binder.brokers=192.168.1.100:9092
--spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.kafka.binder.zkNodes=192.168.1.100:2181

Doing so causes the spring.cloud.stream.kafka.binder.brokers and spring.cloud.stream.kafka.binder.zkNodes properties to be passed to all the launched applications.spring-doc.cn

Properties configured with this mechanism have lower precedence than stream deployment properties. They are overridden if a property with the same key is specified at stream deployment time (for example, app.http.spring.cloud.stream.kafka.binder.brokers overrides the common property).

18.3. Deploying a Stream

This section describes how to deploy a Stream when the Spring Cloud Data Flow server is responsible for deploying the stream. It covers the deployment and upgrade of Streams by using the Skipper service. The description of how to set deployment properties applies to both approaches of Stream deployment.spring-doc.cn

Consider the ticktock stream definition:spring-doc.cn

dataflow:> stream create --definition "time | log" --name ticktock

To deploy the stream, use the following shell command:spring-doc.cn

dataflow:> stream deploy --name ticktock

The Data Flow Server delegates to Skipper the resolution and deployment of the time and log applications.spring-doc.cn

The stream info command shows useful information about the stream, including the deployment properties:spring-doc.cn

dataflow:>stream info --name ticktock
╔═══════════╤═════════════════╤═════════╗
║Stream Name│Stream Definition│  Status ║
╠═══════════╪═════════════════╪═════════╣
║ticktock   │time | log       │deploying║
╚═══════════╧═════════════════╧═════════╝

Stream Deployment properties: {
  "log" : {
    "resource" : "maven://org.springframework.cloud.stream.app:log-sink-rabbit",
    "spring.cloud.deployer.group" : "ticktock",
    "version" : "2.0.1.RELEASE"
  },
  "time" : {
    "resource" : "maven://org.springframework.cloud.stream.app:time-source-rabbit",
    "spring.cloud.deployer.group" : "ticktock",
    "version" : "2.0.1.RELEASE"
  }
}

There is an important optional command argument (called --platformName) to the stream deploy command. Skipper can be configured to deploy to multiple platforms. Skipper is pre-configured with a platform named default, which deploys applications to the local machine where Skipper is running. The default value of the --platformName command line argument is default. If you commonly deploy to one platform, when installing Skipper, you can override the configuration of the default platform. Otherwise, specify the platformName to be one of the values returned by the stream platform-list command.spring-doc.cn

In the preceding example, the time source sends the current time as a message each second, and the log sink outputs it by using the logging framework. You can tail the stdout log (which has an <instance> suffix). The log files are located within the directory displayed in the Data Flow Server’s log output, as shown in the following listing:spring-doc.cn

$ tail -f /var/folders/wn/8jxm_tbd1vj28c8vj37n900m0000gn/T/spring-cloud-dataflow-912434582726479179/ticktock-1464788481708/ticktock.log/stdout_0.log
2016-06-01 09:45:11.250  INFO 79194 --- [  kafka-binder-] log.sink    : 06/01/16 09:45:11
2016-06-01 09:45:12.250  INFO 79194 --- [  kafka-binder-] log.sink    : 06/01/16 09:45:12
2016-06-01 09:45:13.251  INFO 79194 --- [  kafka-binder-] log.sink    : 06/01/16 09:45:13

You can also create and deploy the stream in one step by passing the --deploy flag when creating the stream, as follows:spring-doc.cn

dataflow:> stream create --definition "time | log" --name ticktock --deploy

However, it is not common in real-world use cases to create and deploy the stream in one step. The reason is that when you use the stream deploy command, you can pass in properties that define how to map the applications onto the platform (for example, what is the memory size of the container to use, the number of each application to run, and whether to enable data partitioning features). Properties can also override application properties that were set when creating the stream. The next sections cover this feature in detail.spring-doc.cn

18.3.1. Deployment Properties

When deploying a stream, you can specify properties that can control how applications are deployed and configured. See the Deployment Properties section of the microsite for more information.spring-doc.cn

18.4. Destroying a Stream

You can delete a stream by issuing the stream destroy command from the shell, as follows:spring-doc.cn

dataflow:> stream destroy --name ticktock

If the stream was deployed, it is undeployed before the stream definition is deleted.spring-doc.cn

18.5. Undeploying a Stream

Often, you want to stop a stream but retain the name and definition for future use. In that case, you can undeploy the stream by name:spring-doc.cn

dataflow:> stream undeploy --name ticktock
dataflow:> stream deploy --name ticktock

You can issue the deploy command at a later time to restart it:spring-doc.cn

dataflow:> stream deploy --name ticktock

18.6. Validating a Stream

Sometimes, an application contained within a stream definition contains an invalid URI in its registration. This can caused by an invalid URI being entered at application registration time or by the application being removed from the repository from which it was to be drawn. To verify that all the applications contained in a stream are resolve-able, a user can use the validate command:spring-doc.cn

dataflow:>stream validate ticktock
╔═══════════╤═════════════════╗
║Stream Name│Stream Definition║
╠═══════════╪═════════════════╣
║ticktock   │time | log       ║
╚═══════════╧═════════════════╝


ticktock is a valid stream.
╔═══════════╤═════════════════╗
║ App Name  │Validation Status║
╠═══════════╪═════════════════╣
║source:time│valid            ║
║sink:log   │valid            ║
╚═══════════╧═════════════════╝

In the preceding example, the user validated their ticktock stream. Both the source:time and sink:log are valid. Now we can see what happens if we have a stream definition with a registered application with an invalid URI:spring-doc.cn

dataflow:>stream validate bad-ticktock
╔════════════╤═════════════════╗
║Stream Name │Stream Definition║
╠════════════╪═════════════════╣
║bad-ticktock│bad-time | log   ║
╚════════════╧═════════════════╝


bad-ticktock is an invalid stream.
╔═══════════════╤═════════════════╗
║   App Name    │Validation Status║
╠═══════════════╪═════════════════╣
║source:bad-time│invalid          ║
║sink:log       │valid            ║
╚═══════════════╧═════════════════╝

In this case, Spring Cloud Data Flow states that the stream is invalid because source:bad-time has an invalid URI.spring-doc.cn

18.7. Updating a Stream

To update the stream, use the stream update command, which takes either --properties or --propertiesFile as a command argument. Skipper has an important new top-level prefix: version. The following commands deploy http | log stream (and the version of log which registered at the time of deployment was 1.1.0.RELEASE):spring-doc.cn

dataflow:> stream create --name httptest --definition "http --server.port=9000 | log"
dataflow:> stream deploy --name httptest
dataflow:>stream info httptest
╔══════════════════════════════╤══════════════════════════════╤════════════════════════════╗
║             Name             │             DSL              │          Status            ║
╠══════════════════════════════╪══════════════════════════════╪════════════════════════════╣
║httptest                      │http --server.port=9000 | log │deploying                   ║
╚══════════════════════════════╧══════════════════════════════╧════════════════════════════╝

Stream Deployment properties: {
  "log" : {
    "spring.cloud.deployer.indexed" : "true",
    "spring.cloud.deployer.group" : "httptest",
    "maven://org.springframework.cloud.stream.app:log-sink-rabbit" : "1.1.0.RELEASE"
  },
  "http" : {
    "spring.cloud.deployer.group" : "httptest",
    "maven://org.springframework.cloud.stream.app:http-source-rabbit" : "1.1.0.RELEASE"
  }
}

Then the following command updates the stream to use the 1.2.0.RELEASE version of the log application. Before updating the stream with the specific version of the application, we need to make sure that the application is registered with that version:spring-doc.cn

dataflow:>app register --name log --type sink --uri maven://org.springframework.cloud.stream.app:log-sink-rabbit:1.2.0.RELEASE
Successfully registered application 'sink:log'

Then we can update the application:spring-doc.cn

dataflow:>stream update --name httptest --properties version.log=1.2.0.RELEASE
You can use only pre-registered application versions to deploy, update, or rollback a stream.

To verify the deployment properties and the updated version, we can use stream info, as shown (with its output) in the following example:spring-doc.cn

dataflow:>stream info httptest
╔══════════════════════════════╤══════════════════════════════╤════════════════════════════╗
║             Name             │             DSL              │          Status            ║
╠══════════════════════════════╪══════════════════════════════╪════════════════════════════╣
║httptest                      │http --server.port=9000 | log │deploying                   ║
╚══════════════════════════════╧══════════════════════════════╧════════════════════════════╝

Stream Deployment properties: {
  "log" : {
    "spring.cloud.deployer.indexed" : "true",
    "spring.cloud.deployer.count" : "1",
    "spring.cloud.deployer.group" : "httptest",
    "maven://org.springframework.cloud.stream.app:log-sink-rabbit" : "1.2.0.RELEASE"
  },
  "http" : {
    "spring.cloud.deployer.group" : "httptest",
    "maven://org.springframework.cloud.stream.app:http-source-rabbit" : "1.1.0.RELEASE"
  }
}

18.8. Forcing an Update of a Stream

When upgrading a stream, you can use the --force option to deploy new instances of currently deployed applications even if no application or deployment properties have changed. This behavior is needed for when configuration information is obtained by the application itself at startup time — for example, from Spring Cloud Config Server. You can specify the applications for which to force an upgrade by using the --app-names option. If you do not specify any application names, all the applications are forced to upgrade. You can specify the --force and --app-names options together with the --properties or --propertiesFile options.spring-doc.cn

18.9. Stream Versions

Skipper keeps a history of the streams that were deployed. After updating a Stream, there is a second version of the stream. You can query for the history of the versions by using the stream history --name <name-of-stream> command:spring-doc.cn

dataflow:>stream history --name httptest
╔═══════╤════════════════════════════╤════════╤════════════╤═══════════════╤════════════════╗
║Version│        Last updated        │ Status │Package Name│Package Version│  Description   ║
╠═══════╪════════════════════════════╪════════╪════════════╪═══════════════╪════════════════╣
║2      │Mon Nov 27 22:41:16 EST 2017│DEPLOYED│httptest    │1.0.0          │Upgrade complete║
║1      │Mon Nov 27 22:40:41 EST 2017│DELETED │httptest    │1.0.0          │Delete complete ║
╚═══════╧════════════════════════════╧════════╧════════════╧═══════════════╧════════════════╝

18.10. Stream Manifests

Skipper keeps a “manifest” of the all of the applications, their application properties, and their deployment properties after all values have been substituted. This represents the final state of what was deployed to the platform. You can view the manifest for any of the versions of a Stream by using the following command:spring-doc.cn

stream manifest --name <name-of-stream> --releaseVersion <optional-version>

If the --releaseVersion is not specified, the manifest for the last version is returned.spring-doc.cn

The following example shows the use of the manifest:spring-doc.cn

dataflow:>stream manifest --name httptest

Using the command results in the following output:spring-doc.cn

# Source: log.yml
apiVersion: skipper.spring.io/v1
kind: SpringCloudDeployerApplication
metadata:
  name: log
spec:
  resource: maven://org.springframework.cloud.stream.app:log-sink-rabbit
  version: 1.2.0.RELEASE
  applicationProperties:
    spring.cloud.dataflow.stream.app.label: log
    spring.cloud.stream.bindings.input.group: httptest
    spring.cloud.dataflow.stream.name: httptest
    spring.cloud.dataflow.stream.app.type: sink
    spring.cloud.stream.bindings.input.destination: httptest.http
  deploymentProperties:
    spring.cloud.deployer.indexed: true
    spring.cloud.deployer.group: httptest
    spring.cloud.deployer.count: 1

---
# Source: http.yml
apiVersion: skipper.spring.io/v1
kind: SpringCloudDeployerApplication
metadata:
  name: http
spec:
  resource: maven://org.springframework.cloud.stream.app:http-source-rabbit
  version: 1.2.0.RELEASE
  applicationProperties:
    spring.cloud.dataflow.stream.app.label: http
    spring.cloud.stream.bindings.output.producer.requiredGroups: httptest
    server.port: 9000
    spring.cloud.stream.bindings.output.destination: httptest.http
    spring.cloud.dataflow.stream.name: httptest
    spring.cloud.dataflow.stream.app.type: source
  deploymentProperties:
    spring.cloud.deployer.group: httptest

The majority of the deployment and application properties were set by Data Flow to enable the applications to talk to each other and to send application metrics with identifying labels.spring-doc.cn

18.11. Rollback a Stream

You can roll back to a previous version of the stream by using the stream rollback command:spring-doc.cn

dataflow:>stream rollback --name httptest

The optional --releaseVersion command argument adds the version of the stream. If not specified, the rollback operation goes to the previous stream version.spring-doc.cn

18.12. Application Count

The application count is a dynamic property of the system used to specify the number of instances of applications. See the Application Count section of the microsite for more information.spring-doc.cn

18.13. Skipper’s Upgrade Strategy

Skipper has a simple “red/black” upgrade strategy. It deploys the new version of the applications, using as many instances as the currently running version, and checks the /health endpoint of the application. If the health of the new application is good, the previous application is undeployed. If the health of the new application is bad, all new applications are undeployed, and the upgrade is considered to be not successful.spring-doc.cn

The upgrade strategy is not a rolling upgrade, so, if five instances of the application are running, then, in a sunny-day scenario, five of the new applications are also running before the older version is undeployed.spring-doc.cn

19. Stream DSL

This section covers additional features of the Stream DSL not covered in the Stream DSL introduction.spring-doc.cn

19.1. Tap a Stream

Taps can be created at various producer endpoints in a stream. See the Tapping a Stream section of the microsite for more information.spring-doc.cn

19.2. Using Labels in a Stream

When a stream is made up of multiple applications with the same name, they must be qualified with labels. See the Labeling Applications section of the microsite for more information.spring-doc.cn

19.3. Named Destinations

Instead of referencing a source or sink application, you can use a named destination. See the Named Destinations section of the microsite for more information.spring-doc.cn

19.4. Fan-in and Fan-out

By using named destinations, you can support fan-in and fan-out use cases. See the Fan-in and Fan-out section of the microsite for more information.spring-doc.cn

20. Stream Java DSL

Instead of using the shell to create and deploy streams, you can use the Java-based DSL provided by the spring-cloud-dataflow-rest-client module. See the Java DSL section of the microsite for more information.spring-doc.cn

21. Stream Applications with Multiple Binder Configurations

In some cases, a stream can have its applications bound to multiple spring cloud stream binders when they are required to connect to different messaging middleware configurations. In those cases, you should make sure the applications are configured appropriately with their binder configurations. For example, a multi-binder transformer that supports both Kafka and Rabbit binders is the processor in the following stream:spring-doc.cn

http | multibindertransform --expression=payload.toUpperCase() | log
In the preceding example, you would write your own multibindertransform application.

In this stream, each application connects to messaging middleware in the following way:spring-doc.cn

  1. The HTTP source sends events to RabbitMQ (rabbit1).spring-doc.cn

  2. The Multi-Binder Transform processor receives events from RabbitMQ (rabbit1) and sends the processed events into Kafka (kafka1).spring-doc.cn

  3. The log sink receives events from Kafka (kafka1).spring-doc.cn

Here, rabbit1 and kafka1 are the binder names given in the Spring Cloud Stream application properties. Based on this setup, the applications have the following binders in their classpaths with the appropriate configuration:spring-doc.cn

The spring-cloud-stream binder configuration properties can be set within the applications themselves. If not, they can be passed through deployment properties when the stream is deployed:spring-doc.cn

dataflow:>stream create --definition "http | multibindertransform --expression=payload.toUpperCase() | log" --name mystream

dataflow:>stream deploy mystream --properties "app.http.spring.cloud.stream.bindings.output.binder=rabbit1,app.multibindertransform.spring.cloud.stream.bindings.input.binder=rabbit1,
app.multibindertransform.spring.cloud.stream.bindings.output.binder=kafka1,app.log.spring.cloud.stream.bindings.input.binder=kafka1"

You can override any of the binder configuration properties by specifying them through deployment properties.spring-doc.cn

22. Function Composition

Function composition lets you attach a functional logic dynamically to an existing event streaming application. See the Function Composition section of the microsite for more details.spring-doc.cn

23. Functional Applications

With Spring Cloud Stream 3.x adding functional support, you can build Source, Sink and Processor applications merely by implementing the Java Util’s Supplier, Consumer, and Function interfaces respectively. See the Functional Application Recipe of the SCDF site for more about this feature.spring-doc.cn

24. Examples

This chapter includes the following examples:spring-doc.cn

You can find links to more samples in the “Samples” chapter.spring-doc.cn

24.1. Simple Stream Processing

As an example of a simple processing step, we can transform the payload of the HTTP-posted data to upper case by using the following stream definition:spring-doc.cn

http | transform --expression=payload.toUpperCase() | log

To create this stream, enter the following command in the shell:spring-doc.cn

dataflow:> stream create --definition "http --server.port=9000 | transform --expression=payload.toUpperCase() | log" --name mystream --deploy

The following example uses a shell command to post some data:spring-doc.cn

dataflow:> http post --target http://localhost:9000 --data "hello"

The preceding example results in an upper-case HELLO in the log, as follows:spring-doc.cn

2016-06-01 09:54:37.749  INFO 80083 --- [  kafka-binder-] log.sink    : HELLO

24.2. Stateful Stream Processing

To demonstrate the data partitioning functionality, the following listing deploys a stream with Kafka as the binder:spring-doc.cn

dataflow:>stream create --name words --definition "http --server.port=9900 | splitter --expression=payload.split(' ') | log"
Created new stream 'words'

dataflow:>stream deploy words --properties "app.splitter.producer.partitionKeyExpression=payload,deployer.log.count=2"
Deployed stream 'words'

dataflow:>http post --target http://localhost:9900 --data "How much wood would a woodchuck chuck if a woodchuck could chuck wood"
> POST (text/plain;Charset=UTF-8) http://localhost:9900 How much wood would a woodchuck chuck if a woodchuck could chuck wood
> 202 ACCEPTED


dataflow:>runtime apps
╔════════════════════╤═══════════╤═══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╗
║App Id / Instance Id│Unit Status│                                                               No. of Instances / Attributes                                                               ║
╠════════════════════╪═══════════╪═══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╣
║words.log-v1        │ deployed  │                                                                             2                                                                             ║
╟┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈╢
║                    │           │       guid = 24166                                                                                                                                        ║
║                    │           │        pid = 33097                                                                                                                                        ║
║                    │           │       port = 24166                                                                                                                                        ║
║words.log-v1-0      │ deployed  │     stderr = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461063/words.log-v1/stderr_0.log     ║
║                    │           │     stdout = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461063/words.log-v1/stdout_0.log     ║
║                    │           │        url = https://192.168.0.102:24166                                                                                                                   ║
║                    │           │working.dir = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461063/words.log-v1                  ║
╟┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈╢
║                    │           │       guid = 41269                                                                                                                                        ║
║                    │           │        pid = 33098                                                                                                                                        ║
║                    │           │       port = 41269                                                                                                                                        ║
║words.log-v1-1      │ deployed  │     stderr = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461063/words.log-v1/stderr_1.log     ║
║                    │           │     stdout = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461063/words.log-v1/stdout_1.log     ║
║                    │           │        url = https://192.168.0.102:41269                                                                                                                   ║
║                    │           │working.dir = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461063/words.log-v1                  ║
╟────────────────────┼───────────┼───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╢
║words.http-v1       │ deployed  │                                                                             1                                                                             ║
╟┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈╢
║                    │           │       guid = 9900                                                                                                                                         ║
║                    │           │        pid = 33094                                                                                                                                        ║
║                    │           │       port = 9900                                                                                                                                         ║
║words.http-v1-0     │ deployed  │     stderr = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461054/words.http-v1/stderr_0.log    ║
║                    │           │     stdout = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461054/words.http-v1/stdout_0.log    ║
║                    │           │        url = https://192.168.0.102:9900                                                                                                                    ║
║                    │           │working.dir = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461054/words.http-v1                 ║
╟────────────────────┼───────────┼───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╢
║words.splitter-v1   │ deployed  │                                                                             1                                                                             ║
╟┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈╢
║                    │           │       guid = 33963                                                                                                                                        ║
║                    │           │        pid = 33093                                                                                                                                        ║
║                    │           │       port = 33963                                                                                                                                        ║
║words.splitter-v1-0 │ deployed  │     stderr = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803437542/words.splitter-v1/stderr_0.log║
║                    │           │     stdout = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803437542/words.splitter-v1/stdout_0.log║
║                    │           │        url = https://192.168.0.102:33963                                                                                                                   ║
║                    │           │working.dir = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803437542/words.splitter-v1             ║
╚════════════════════╧═══════════╧═══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╝

When you review the words.log-v1-0 logs, you should see the following:spring-doc.cn

2016-06-05 18:35:47.047  INFO 58638 --- [  kafka-binder-] log.sink                                 : How
2016-06-05 18:35:47.066  INFO 58638 --- [  kafka-binder-] log.sink                                 : chuck
2016-06-05 18:35:47.066  INFO 58638 --- [  kafka-binder-] log.sink                                 : chuck

When you review the words.log-v1-1 logs, you should see the following:spring-doc.cn

2016-06-05 18:35:47.047  INFO 58639 --- [  kafka-binder-] log.sink                                 : much
2016-06-05 18:35:47.066  INFO 58639 --- [  kafka-binder-] log.sink                                 : wood
2016-06-05 18:35:47.066  INFO 58639 --- [  kafka-binder-] log.sink                                 : would
2016-06-05 18:35:47.066  INFO 58639 --- [  kafka-binder-] log.sink                                 : a
2016-06-05 18:35:47.066  INFO 58639 --- [  kafka-binder-] log.sink                                 : woodchuck
2016-06-05 18:35:47.067  INFO 58639 --- [  kafka-binder-] log.sink                                 : if
2016-06-05 18:35:47.067  INFO 58639 --- [  kafka-binder-] log.sink                                 : a
2016-06-05 18:35:47.067  INFO 58639 --- [  kafka-binder-] log.sink                                 : woodchuck
2016-06-05 18:35:47.067  INFO 58639 --- [  kafka-binder-] log.sink                                 : could
2016-06-05 18:35:47.067  INFO 58639 --- [  kafka-binder-] log.sink                                 : wood

This example has shown that payload splits that contain the same word are routed to the same application instance.spring-doc.cn

24.3. Other Source and Sink Application Types

This example shows something a bit more complicated: swapping out the time source for something else. Another supported source type is http, which accepts data for ingestion over HTTP POST requests. Note that the http source accepts data on a different port from the Data Flow Server (default 8080). By default, the port is randomly assigned.spring-doc.cn

To create a stream that uses an http source but still uses the same log sink, we would change the original command in the Simple Stream Processing example to the following:spring-doc.cn

dataflow:> stream create --definition "http | log" --name myhttpstream --deploy

Note that, this time, we do not see any other output until we actually post some data (by using a shell command). To see the randomly assigned port on which the http source is listening, run the following command:spring-doc.cn

dataflow:>runtime apps

╔══════════════════════╤═══════════╤═════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╗
║ App Id / Instance Id │Unit Status│                                                                    No. of Instances / Attributes                                                                    ║
╠══════════════════════╪═══════════╪═════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╣
║myhttpstream.log-v1   │ deploying │                                                                                  1                                                                                  ║
╟┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈╢
║                      │           │       guid = 39628                                                                                                                                                  ║
║                      │           │        pid = 34403                                                                                                                                                  ║
║                      │           │       port = 39628                                                                                                                                                  ║
║myhttpstream.log-v1-0 │ deploying │     stderr = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/myhttpstream-1542803867070/myhttpstream.log-v1/stderr_0.log ║
║                      │           │     stdout = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/myhttpstream-1542803867070/myhttpstream.log-v1/stdout_0.log ║
║                      │           │        url = https://192.168.0.102:39628                                                                                                                             ║
║                      │           │working.dir = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/myhttpstream-1542803867070/myhttpstream.log-v1              ║
╟──────────────────────┼───────────┼─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╢
║myhttpstream.http-v1  │ deploying │                                                                                  1                                                                                  ║
╟┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈╢
║                      │           │       guid = 52143                                                                                                                                                  ║
║                      │           │        pid = 34401                                                                                                                                                  ║
║                      │           │       port = 52143                                                                                                                                                  ║
║myhttpstream.http-v1-0│ deploying │     stderr = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/myhttpstream-1542803866800/myhttpstream.http-v1/stderr_0.log║
║                      │           │     stdout = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/myhttpstream-1542803866800/myhttpstream.http-v1/stdout_0.log║
║                      │           │        url = https://192.168.0.102:52143                                                                                                                             ║
║                      │           │working.dir = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/myhttpstream-1542803866800/myhttpstream.http-v1             ║
╚══════════════════════╧═══════════╧═════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╝

You should see that the corresponding http source has a url property that contains the host and port information on which it is listening. You are now ready to post to that url, as shown in the following example:spring-doc.cn

dataflow:> http post --target http://localhost:1234 --data "hello"
dataflow:> http post --target http://localhost:1234 --data "goodbye"

The stream then funnels the data from the http source to the output log implemented by the log sink, yielding output similar to the following:spring-doc.cn

2016-06-01 09:50:22.121  INFO 79654 --- [  kafka-binder-] log.sink    : hello
2016-06-01 09:50:26.810  INFO 79654 --- [  kafka-binder-] log.sink    : goodbye

We could also change the sink implementation. You could pipe the output to a file (file), to hadoop (hdfs), or to any of the other sink applications that are available. You can also define your own applications.spring-doc.cn