Streams
This section goes into more detail about how you can create Streams, which are collections of Spring Cloud Stream applications. It covers topics such as creating and deploying Streams.
If you are just starting out with Spring Cloud Data Flow, you should probably read the Getting Started guide before diving into this section.
17. Introduction
A Stream is a collection of long-lived Spring Cloud Stream applications that communicate with each other over messaging middleware. A text-based DSL defines the configuration and data flow between the applications. While many applications are provided for you to implement common use-cases, you typically create a custom Spring Cloud Stream application to implement custom business logic.
The general lifecycle of a Stream is:
-
Register applications.
-
Create a Stream Definition.
-
Deploy the Stream.
-
Undeploy or destroy the Stream.
-
Upgrade or roll back applications in the Stream.
For deploying Streams, the Data Flow Server has to be configured to delegate the deployment to a new server in the Spring Cloud ecosystem named Skipper.
Furthermore, you can configure Skipper to deploy applications to one or more Cloud Foundry orgs and spaces, one or more namespaces on a Kubernetes cluster, or to the local machine. When deploying a stream in Data Flow, you can specify which platform to use at deployment time. Skipper also provides Data Flow with the ability to perform updates to deployed streams. There are many ways the applications in a stream can be updated, but one of the most common examples is to upgrade a processor application with new custom business logic while leaving the existing source and sink applications alone.
17.1. Stream Pipeline DSL
A stream is defined by using a Unix-inspired Pipeline syntax.
The syntax uses vertical bars, known as “pipes”, to connect multiple commands.
The command ls -l | grep key | less
in Unix takes the output of the ls -l
process and pipes it to the input of the grep key
process.
The output of grep
is, in turn, sent to the input of the less
process.
Each |
symbol connects the standard output of the command on the left to the standard input of the command on the right.
Data flows through the pipeline from left to right.
In Data Flow, the Unix command is replaced by a Spring Cloud Stream application and each pipe symbol represents connecting the input and output of applications over messaging middleware, such as RabbitMQ or Apache Kafka.
Each Spring Cloud Stream application is registered under a simple name. The registration process specifies where the application can be obtained (for example, in a Maven Repository or a Docker registry). You can find out more about how to register Spring Cloud Stream applications in this section. In Data Flow, we classify the Spring Cloud Stream applications as Sources, Processors, or Sinks.
As a simple example, consider the collection of data from an HTTP Source and writing to a File Sink. Using the DSL, the stream description is:
http | file
A stream that involves some processing would be expressed as:
http | filter | transform | file
Stream definitions can be created by using the shell’s stream create
command, as shown in the following example:
dataflow:> stream create --name httpIngest --definition "http | file"
The Stream DSL is passed in to the --definition
command option.
The deployment of stream definitions is done through the Shell’s stream deploy
command, as follows:
dataflow:> stream deploy --name ticktock
The Getting Started section shows you how to start the server and how to start and use the Spring Cloud Data Flow shell.
Note that the shell calls the Data Flow Server’s REST API. For more information on making HTTP requests directly to the server, see the REST API Guide.
When naming a stream definition, keep in mind that each application in the stream will be created on the platform with the name in the format of <stream name>-<app name> . Thus, the total length of the generated application name can’t exceed 58 characters.
|
17.2. Stream Application DSL
You can use the Stream Application DSL to define custom binding properties for each of the Spring Cloud Stream applications. See the Stream Application DSL section of the microsite for more information.
Consider the following Java interface, which defines an input method and two output methods:
public interface Barista {
@Input
SubscribableChannel orders();
@Output
MessageChannel hotDrinks();
@Output
MessageChannel coldDrinks();
}
Further consider the following Java interface, which is typical for creating a Kafka Streams application:
interface KStreamKTableBinding {
@Input
KStream<?, ?> inputStream();
@Input
KTable<?, ?> inputTable();
}
In these cases with multiple input and output bindings, Data Flow cannot make any assumptions about the flow of data from one application to another.
Therefore, you need to set the binding properties to “wire up” the application.
The Stream Application DSL uses a “double pipe”, instead of the “pipe symbol”, to indicate that Data Flow should not configure the binding properties of the application. Think of ||
as meaning “in parallel”.
The following example shows such a “parallel” definition:
dataflow:> stream create --definition "orderGeneratorApp || baristaApp || hotDrinkDeliveryApp || coldDrinkDeliveryApp" --name myCafeStream
Breaking Change! Versions of SCDF Local, Cloud Foundry 1.7.0 to 1.7.2 and SCDF Kubernetes 1.7.0 to 1.7.1 used the comma character as the separator between applications. This caused breaking changes in the traditional Stream DSL. While not ideal, changing the separator character was felt to be the best solution with the least impact on existing users.
|
This stream has four applications.
baristaApp
has two output destinations, hotDrinks
and coldDrinks
, intended to be consumed by the hotDrinkDeliveryApp
and coldDrinkDeliveryApp
, respectively.
When deploying this stream, you need to set the binding properties so that the baristaApp
sends hot drink messages to the hotDrinkDeliveryApp
destination and cold drink messages to the coldDrinkDeliveryApp
destination.
The following listing does so:
app.baristaApp.spring.cloud.stream.bindings.hotDrinks.destination=hotDrinksDest
app.baristaApp.spring.cloud.stream.bindings.coldDrinks.destination=coldDrinksDest
app.hotDrinkDeliveryApp.spring.cloud.stream.bindings.input.destination=hotDrinksDest
app.coldDrinkDeliveryApp.spring.cloud.stream.bindings.input.destination=coldDrinksDest
If you want to use consumer groups, you need to set the Spring Cloud Stream application properties, spring.cloud.stream.bindings.<channelName>.producer.requiredGroups
and spring.cloud.stream.bindings.<channelName>.group
, on the producer and consumer applications respectively.
Another common use case for the Stream Application DSL is to deploy a HTTP gateway application that sends a synchronous request or reply message to a Kafka or RabbitMQ application. In this case, both the HTTP gateway application and the Kafka or RabbitMQ application can be a Spring Integration application that does not make use of the Spring Cloud Stream library.
It is also possible to deploy only a single application using the Stream application DSL.
17.3. Application Properties
Each application takes properties to customize its behavior. As an example, the http
source module exposes a port
setting that lets the data ingestion port be changed from the default value:
dataflow:> stream create --definition "http --port=8090 | log" --name myhttpstream
This port
property is actually the same as the standard Spring Boot server.port
property.
Data Flow adds the ability to use the shorthand form port
instead of server.port
.
You can also specify the longhand version:
dataflow:> stream create --definition "http --server.port=8000 | log" --name myhttpstream
This shorthand behavior is discussed more in the section on Whitelisting Application Properties.
If you have registered application property metadata, you can use tab completion in the shell after typing --
to get a list of candidate property names.
The shell provides tab completion for application properties. The app info --name <appName> --type <appType>
shell command provides additional documentation for all the supported properties.
Supported Stream <appType> possibilities are: source , processor , and sink .
|
18. Stream Lifecycle
The lifecycle of a stream goes through the following stages:
Skipper is a server that lets you discover Spring Boot applications and manage their lifecycle on multiple Cloud Platforms.
Applications in Skipper are bundled as packages that contain the application’s resource location, application properties, and deployment properties.
You can think of Skipper packages as being analogous to packages found in tools such as apt-get
or brew
.
When Data Flow deploys a Stream, it generates and upload a package to Skipper that represents the applications in the Stream. Subsequent commands to upgrade or roll back the applications within the Stream are passed through to Skipper. In addition, the Stream definition is reverse-engineered from the package, and the status of the Stream is also delegated to Skipper.
18.1. Register a Stream Application
You can register a versioned stream application by using the app register
command. You must provide a unique name, an application type, and a URI that can be resolved to the application artifact.
For the type, specify source
, processor
, or sink
. The version is resolved from the URI. Here are a few examples:
dataflow:>app register --name mysource --type source --uri maven://com.example:mysource:0.0.1
dataflow:>app register --name mysource --type source --uri maven://com.example:mysource:0.0.2
dataflow:>app register --name mysource --type source --uri maven://com.example:mysource:0.0.3
dataflow:>app list --id source:mysource
╔═══╤══════════════════╤═════════╤════╤════╗
║app│ source │processor│sink│task║
╠═══╪══════════════════╪═════════╪════╪════╣
║ │> mysource-0.0.1 <│ │ │ ║
║ │mysource-0.0.2 │ │ │ ║
║ │mysource-0.0.3 │ │ │ ║
╚═══╧══════════════════╧═════════╧════╧════╝
dataflow:>app register --name myprocessor --type processor --uri file:///Users/example/myprocessor-1.2.3.jar
dataflow:>app register --name mysink --type sink --uri https://example.com/mysink-2.0.1.jar
The application URI should conform to one the following schema formats:
-
Maven schema:
maven://<groupId>:<artifactId>[:<extension>[:<classifier>]]:<version>
-
HTTP schema:
http://<web-path>/<artifactName>-<version>.jar
-
File schema:
file:///<local-path>/<artifactName>-<version>.jar
-
Docker schema:
docker:<docker-image-path>/<imageName>:<version>
The URI <version> part is compulsory for versioned stream applications.
Skipper uses the multi-versioned stream applications to allow upgrading or rolling back those applications at runtime by using the deployment properties.
|
If you would like to register the snapshot versions of the http
and log
applications built with the RabbitMQ binder, you could do the following:
dataflow:>app register --name http --type source --uri maven://org.springframework.cloud.stream.app:http-source-rabbit:1.2.1.BUILD-SNAPSHOT
dataflow:>app register --name log --type sink --uri maven://org.springframework.cloud.stream.app:log-sink-rabbit:1.2.1.BUILD-SNAPSHOT
If you would like to register multiple applications at one time, you can store them in a properties file,
where the keys are formatted as <type>.<name>
and the values are the URIs.
For example, to register the snapshot versions of the http
and log
applications built with the RabbitMQ binder, you could have the following in a properties file (for example, stream-apps.properties
):
source.http=maven://org.springframework.cloud.stream.app:http-source-rabbit:1.2.1.BUILD-SNAPSHOT
sink.log=maven://org.springframework.cloud.stream.app:log-sink-rabbit:1.2.1.BUILD-SNAPSHOT
Then, to import the applications in bulk, use the app import
command and provide the location of the properties file with the --uri
switch, as follows:
dataflow:>app import --uri file:///<YOUR_FILE_LOCATION>/stream-apps.properties
Registering an application by using --type app
is the same as registering a source
, processor
or sink
.
Applications of the type app
can be used only in the Stream Application DSL (which uses a comma instead of the pipe symbol in the DSL) and instructs Data Flow not to configure the Spring Cloud Stream binding properties of the application.
The application that is registered using --type app
does not have to be a Spring Cloud Stream application. It can be any Spring Boot application.
See the Stream Application DSL introduction for more about using this application type.
You can register multiple versions of the same applications (for example, the same name and type), but you can set only one as the default. The default version is used for deploying Streams.
The first time an application is registered, it is marked as default. The default application version can be altered with the app default
command:
dataflow:>app default --id source:mysource --version 0.0.2
dataflow:>app list --id source:mysource
╔═══╤══════════════════╤═════════╤════╤════╗
║app│ source │processor│sink│task║
╠═══╪══════════════════╪═════════╪════╪════╣
║ │mysource-0.0.1 │ │ │ ║
║ │> mysource-0.0.2 <│ │ │ ║
║ │mysource-0.0.3 │ │ │ ║
╚═══╧══════════════════╧═════════╧════╧════╝
The app list --id <type:name>
command lists all versions for a given stream application.
The app unregister
command has an optional --version
parameter to specify the application version to unregister:
dataflow:>app unregister --name mysource --type source --version 0.0.1
dataflow:>app list --id source:mysource
╔═══╤══════════════════╤═════════╤════╤════╗
║app│ source │processor│sink│task║
╠═══╪══════════════════╪═════════╪════╪════╣
║ │> mysource-0.0.2 <│ │ │ ║
║ │mysource-0.0.3 │ │ │ ║
╚═══╧══════════════════╧═════════╧════╧════╝
If --version
is not specified, the default version is unregistered.
All applications in a stream should have a default version set for the stream to be deployed.
Otherwise, they are treated as unregistered application during the deployment.
Use the |
app default --id source:mysource --version 0.0.3
dataflow:>app list --id source:mysource
╔═══╤══════════════════╤═════════╤════╤════╗
║app│ source │processor│sink│task║
╠═══╪══════════════════╪═════════╪════╪════╣
║ │mysource-0.0.2 │ │ │ ║
║ │> mysource-0.0.3 <│ │ │ ║
╚═══╧══════════════════╧═════════╧════╧════╝
The stream deploy
necessitates default application versions being set.
The stream update
and stream rollback
commands, though, can use all (default and non-default) registered application versions.
The following command creates a stream that uses the default mysource version (0.0.3):
dataflow:>stream create foo --definition "mysource | log"
Then we can update the version to 0.0.2:
dataflow:>stream update foo --properties version.mysource=0.0.2
Only pre-registered applications can be used to deploy , update , or rollback a Stream.
|
An attempt to update the mysource
to version 0.0.1
(not registered) fails.
18.1.1. Register Supported Applications and Tasks
For convenience, we have the static files with application-URIs (for both Maven and Docker) available for all the out-of-the-box stream and task or batch app-starters. You can point to this file and import all the application-URIs in bulk. Otherwise, as explained previously, you can register them individually or have your own custom property file with only the required application-URIs in it. We recommend, however, having a “focused” list of desired application-URIs in a custom property file.
Spring Cloud Stream App Starters
The following table includes the dataflow.spring.io
links to the available Stream Application Starters based on Spring Cloud Stream 2.1.x
and Spring Boot 2.1.x:
Artifact Type | Stable Release | SNAPSHOT Release |
---|---|---|
RabbitMQ + Maven |
dataflow.spring.io/Einstein-BUILD-SNAPSHOT-stream-applications-rabbit-maven |
|
RabbitMQ + Docker |
dataflow.spring.io/Einstein-BUILD-SNAPSHOT-stream-applications-rabbit-docker |
|
Apache Kafka + Maven |
dataflow.spring.io/Einstein-BUILD-SNAPSHOT-stream-applications-kafka-maven |
|
Apache Kafka + Docker |
dataflow.spring.io/Einstein-BUILD-SNAPSHOT-stream-applications-kafka-docker |
By default, App Starter actuator endpoints are secured. You can disable security by deploying streams with the
app.*.spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration property.
On Kubernetes, see the Liveness and readiness probes section for how to configure
security for actuator endpoints.
|
Starting with the Spring Cloud Stream 2.1 GA release, we now have robust interoperability with the Spring Cloud Function programming model. Building on that, with the Einstein release-train, it is now possible to pick a few Stream App Starters and compose them into a single application by using the functional-style programming model. Check out the "Composed Function Support in Spring Cloud Data Flow" blog to learn more about the developer and orchestration-experience with an example. |
Spring Cloud Task App Starters
The following table includes the available Task Application Starters based on Spring Cloud Task 2.1.x and Spring Boot 2.1.x:
Artifact Type | Stable Release | SNAPSHOT Release |
---|---|---|
Maven |
dataflow.spring.io/Elston-BUILD-SNAPSHOT-task-applications-maven |
|
Docker |
dataflow.spring.io/Elston-BUILD-SNAPSHOT-task-applications-docker |
You can find more information about the available task starters in the Task App Starters Project Page and related reference documentation. For more information about the available stream starters, look at the Stream App Starters Project Page and related reference documentation.
As an example, if you would like to register all out-of-the-box stream applications built with the Kafka binder in bulk, you can use the following command:
$ dataflow:>app import --uri https://dataflow.spring.io/kafka-maven-latest
Alternatively, you can register all the stream applications with the Rabbit binder, as follows:
$ dataflow:>app import --uri https://dataflow.spring.io/rabbitmq-maven-latest
You can also pass the --local
option (which is true
by default) to indicate whether the
properties file location should be resolved within the shell process itself. If the location should
be resolved from the Data Flow Server process, specify --local false
.
When you use either Note, however, that, once downloaded, applications may be cached locally on the Data Flow server, based on the resource
location. If the resource location does not change (even though the actual resource bytes may be different), it
is not re-downloaded. When using Moreover, if a stream is already deployed and uses some version of a registered app, then (forcibly) re-registering a different application has no effect until the stream is deployed again. |
In some cases, the resource is resolved on the server side. In others, the URI is passed to a runtime container instance, where it is resolved. See the specific documentation of each Data Flow Server for more detail. |
18.1.2. Whitelisting Application Properties
Stream and Task applications are Spring Boot applications that are aware of many Common Application Properties, such as server.port
, but also families of properties, such as those with the prefix spring.jmx
and logging
. When creating your own application, you should create a list of allowed properties so that the shell and the UI can display them first as primary properties when presenting options through tab completion or in drop-down boxes.
To create a list of allowed application properties, create a file named spring-configuration-metadata-whitelist.properties
in the META-INF
resource directory. There are two property keys that you can use inside this file. The first key is named configuration-properties.classes
. The value is a comma-separated list of fully qualified @ConfigurationProperty
class names. The second key is configuration-properties.names
, whose value is a comma-separated list of property names. This can contain the full name of the property, such as server.port
, or a partial name to allow a category of property names, such as spring.jmx
.
The Spring Cloud Stream application starters are a good place to look for examples of usage. The following example comes from the file sink’s spring-configuration-metadata-whitelist.properties
file:
configuration-properties.classes=org.springframework.cloud.stream.app.file.sink.FileSinkProperties
If we also want to add server.port
to be allowed, it would become the following line:
configuration-properties.classes=org.springframework.cloud.stream.app.file.sink.FileSinkProperties
configuration-properties.names=server.port
Add 'spring-boot-configuration-processor' as an optional dependency to generate configuration metadata file for the properties.
The allow support works only for uber-jar application artifacts. At the moment, the metadata properties are not retrievable from the Dockerized application images directly —- a dedicated companion metadata JAR is required. The |
18.1.3. Creating and Using a Dedicated Metadata Artifact
You can go a step further in the process of describing the main properties that your stream or task application supports by creating a metadata companion artifact. This jar file contains only the Spring boot JSON file about configuration properties metadata and the allow file described in the previous section.
The following example shows the contents of such an artifact, for the canonical log
sink:
$ jar tvf log-sink-rabbit-1.2.1.BUILD-SNAPSHOT-metadata.jar
373848 META-INF/spring-configuration-metadata.json
174 META-INF/spring-configuration-metadata-whitelist.properties
Note that the spring-configuration-metadata.json
file is quite large. This is because it contains the concatenation of all the properties that
are available at runtime to the log
sink (some of them come from spring-boot-actuator.jar
, some of them come from
spring-boot-autoconfigure.jar
, some more from spring-cloud-starter-stream-sink-log.jar
, and so on). Data Flow
always relies on all those properties, even when a companion artifact is not available, but here all have been merged
into a single file.
To help with that (you do not want to try to craft this giant JSON file by hand), you can use the following plugin in your build:
<plugin>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-app-starter-metadata-maven-plugin</artifactId>
<executions>
<execution>
<id>aggregate-metadata</id>
<phase>compile</phase>
<goals>
<goal>aggregate-metadata</goal>
</goals>
</execution>
</executions>
</plugin>
This plugin comes in addition to the spring-boot-configuration-processor that creates the individual JSON files.
Be sure to configure both.
|
The benefits of a companion artifact include:
-
Being much lighter. (The companion artifact is usually a few kilobytes, as opposed to megabytes for the actual application.) Consequently, they are quicker to download, allowing quicker feedback when using, for example,
app info
or the Dashboard UI. -
As a consequence of being lighter, they can be used in resource-constrained environments (such as PaaS), where metadata is the only piece of information needed.
-
For environments that do not deal with Spring Boot uber jars directly (for example, Docker-based runtimes, such as Kubernetes or Cloud Foundry), this is the only way to provide metadata about the properties supported by the application.
Remember, though, that this is entirely optional when dealing with uber jars. The uber jar itself also includes the metadata.
18.1.4. Using the Companion Artifact
Once you have a companion artifact, you need to make the system aware of it so that it can be used.
When registering a single application with app register
, you can use the optional --metadata-uri
option in the shell:
dataflow:>app register --name log --type sink
--uri maven://org.springframework.cloud.stream.app:log-sink:2.1.0.RELEASE
--metadata-uri maven://org.springframework.cloud.stream.app:log-sink:jar:metadata:2.1.0.RELEASE
When registering several files by using the app import
command, the file should contain a <type>.<name>.metadata
line
in addition to each <type>.<name>
line. Strictly speaking, doing so is optional (if some apps have it but some others do not, it works), but it is best practice.
The following example shows a Dockerized application, where the metadata artifact is being hosted in a Maven repository (retrieving
it through http://
or file://
would also work).
...
source.http=docker:springcloudstream/http-source-rabbit:latest
source.http.metadata=maven://org.springframework.cloud.stream.app:http-source-rabbit:jar:metadata:2.1.0.RELEASE
...
18.1.5. Creating Custom Applications
While Data Flow includes source, processor, sink applications, you can extend these applications or write a custom Spring Cloud Stream application.
The process of creating Spring Cloud Stream applications with Spring Initializr is detailed in the Spring Cloud Stream documentation. You can include multiple binders to an application. If you do so, see the instructions in [passing_producer_consumer_properties] for how to configure them.
To support allowing properties, Spring Cloud Stream applications running in Spring Cloud Data Flow can include the Spring Boot configuration-processor
as an optional dependency, as shown in the following example:
<dependencies>
<!-- other dependencies -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
NOTE:Make sure that the spring-boot-maven-plugin
is included in the POM.
The plugin is necessary for creating the executable jar that is registered with Spring Cloud Data Flow.
Spring Initialzr includes the plugin in the generated POM.
Once you have created a custom application, you can register it, as described in Register a Stream Application.
18.2. Creating a Stream
The Spring Cloud Data Flow Server exposes a full RESTful API for managing the lifecycle of stream definitions, but the easiest way to use is it is through the Spring Cloud Data Flow shell. The Getting Started section describes how to start the shell.
New streams are created with the help of stream definitions. The definitions are built from a simple DSL. For example, consider what happens if we run the following shell command:
dataflow:> stream create --definition "time | log" --name ticktock
This defines a stream named ticktock
that is based off of the DSL expression time | log
. The DSL uses the “pipe” symbol (|
), to connect a source to a sink.
The stream info
command shows useful information about the stream, as shown (with its output) in the following example:
dataflow:>stream info ticktock
╔═══════════╤═════════════════╤══════════╗
║Stream Name│Stream Definition│ Status ║
╠═══════════╪═════════════════╪══════════╣
║ticktock │time | log │undeployed║
╚═══════════╧═════════════════╧══════════╝
18.2.1. Application Properties
Application properties are the properties associated with each application in the stream. When the application is deployed, the application properties are applied to the application through command-line arguments or environment variables, depending on the underlying deployment implementation.
The following stream can have application properties defined at the time of stream creation:
dataflow:> stream create --definition "time | log" --name ticktock
The app info --name <appName> --type <appType>
shell command displays the allowed application properties for the application.
For more about property allowing, see Whitelisting Application Properties.
The following listing shows the allowed properties for the time
application:
dataflow:> app info --name time --type source
╔══════════════════════════════╤══════════════════════════════╤══════════════════════════════╤══════════════════════════════╗
║ Option Name │ Description │ Default │ Type ║
╠══════════════════════════════╪══════════════════════════════╪══════════════════════════════╪══════════════════════════════╣
║trigger.time-unit │The TimeUnit to apply to delay│<none> │java.util.concurrent.TimeUnit ║
║ │values. │ │ ║
║trigger.fixed-delay │Fixed delay for periodic │1 │java.lang.Integer ║
║ │triggers. │ │ ║
║trigger.cron │Cron expression value for the │<none> │java.lang.String ║
║ │Cron Trigger. │ │ ║
║trigger.initial-delay │Initial delay for periodic │0 │java.lang.Integer ║
║ │triggers. │ │ ║
║trigger.max-messages │Maximum messages per poll, -1 │1 │java.lang.Long ║
║ │means infinity. │ │ ║
║trigger.date-format │Format for the date value. │<none> │java.lang.String ║
╚══════════════════════════════╧══════════════════════════════╧══════════════════════════════╧══════════════════════════════╝
The following listing shows the allowed properties for the log
application:
dataflow:> app info --name log --type sink
╔══════════════════════════════╤══════════════════════════════╤══════════════════════════════╤══════════════════════════════╗
║ Option Name │ Description │ Default │ Type ║
╠══════════════════════════════╪══════════════════════════════╪══════════════════════════════╪══════════════════════════════╣
║log.name │The name of the logger to use.│<none> │java.lang.String ║
║log.level │The level at which to log │<none> │org.springframework.integratio║
║ │messages. │ │n.handler.LoggingHandler$Level║
║log.expression │A SpEL expression (against the│payload │java.lang.String ║
║ │incoming message) to evaluate │ │ ║
║ │as the logged message. │ │ ║
╚══════════════════════════════╧══════════════════════════════╧══════════════════════════════╧══════════════════════════════╝
You can specify the application properties for the time
and log
apps at the time of stream
creation, as follows:
dataflow:> stream create --definition "time --fixed-delay=5 | log --level=WARN" --name ticktock
Note that, in the preceding example, the fixed-delay
and level
properties defined for the time
and log
applications are the “short-form” property names provided by the shell completion.
These “short-form” property names are applicable only for the allowed properties. In all other cases, you should use only fully qualified property names.
18.2.2. Common Application Properties
In addition to configuration through DSL, Spring Cloud Data Flow provides a mechanism for setting common properties to all
the streaming applications that are launched by it.
This can be done by adding properties prefixed with spring.cloud.dataflow.applicationProperties.stream
when starting
the server.
When doing so, the server passes all the properties, without the prefix, to the instances it launches.
For example, all the launched applications can be configured to use a specific Kafka broker by launching the Data Flow server with the following options:
--spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.kafka.binder.brokers=192.168.1.100:9092
--spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.kafka.binder.zkNodes=192.168.1.100:2181
Doing so causes the spring.cloud.stream.kafka.binder.brokers
and spring.cloud.stream.kafka.binder.zkNodes
properties
to be passed to all the launched applications.
Properties configured with this mechanism have lower precedence than stream deployment properties.
They are overridden if a property with the same key is specified at stream deployment time (for example,
app.http.spring.cloud.stream.kafka.binder.brokers overrides the common property).
|
18.3. Deploying a Stream
This section describes how to deploy a Stream when the Spring Cloud Data Flow server is responsible for deploying the stream. It covers the deployment and upgrade of Streams by using the Skipper service. The description of how to set deployment properties applies to both approaches of Stream deployment.
Consider the ticktock
stream definition:
dataflow:> stream create --definition "time | log" --name ticktock
To deploy the stream, use the following shell command:
dataflow:> stream deploy --name ticktock
The Data Flow Server delegates to Skipper the resolution and deployment of the time
and log
applications.
The stream info
command shows useful information about the stream, including the deployment properties:
dataflow:>stream info --name ticktock
╔═══════════╤═════════════════╤═════════╗
║Stream Name│Stream Definition│ Status ║
╠═══════════╪═════════════════╪═════════╣
║ticktock │time | log │deploying║
╚═══════════╧═════════════════╧═════════╝
Stream Deployment properties: {
"log" : {
"resource" : "maven://org.springframework.cloud.stream.app:log-sink-rabbit",
"spring.cloud.deployer.group" : "ticktock",
"version" : "2.0.1.RELEASE"
},
"time" : {
"resource" : "maven://org.springframework.cloud.stream.app:time-source-rabbit",
"spring.cloud.deployer.group" : "ticktock",
"version" : "2.0.1.RELEASE"
}
}
There is an important optional command argument (called --platformName
) to the stream deploy
command.
Skipper can be configured to deploy to multiple platforms.
Skipper is pre-configured with a platform named default
, which deploys applications to the local machine where Skipper is running.
The default value of the --platformName
command line argument is default
.
If you commonly deploy to one platform, when installing Skipper, you can override the configuration of the default
platform.
Otherwise, specify the platformName
to be one of the values returned by the stream platform-list
command.
In the preceding example, the time source sends the current time as a message each second, and the log sink outputs it by using the logging framework.
You can tail the stdout
log (which has an <instance>
suffix). The log files are located within the directory displayed in the Data Flow Server’s log output, as shown in the following listing:
$ tail -f /var/folders/wn/8jxm_tbd1vj28c8vj37n900m0000gn/T/spring-cloud-dataflow-912434582726479179/ticktock-1464788481708/ticktock.log/stdout_0.log
2016-06-01 09:45:11.250 INFO 79194 --- [ kafka-binder-] log.sink : 06/01/16 09:45:11
2016-06-01 09:45:12.250 INFO 79194 --- [ kafka-binder-] log.sink : 06/01/16 09:45:12
2016-06-01 09:45:13.251 INFO 79194 --- [ kafka-binder-] log.sink : 06/01/16 09:45:13
You can also create and deploy the stream in one step by passing the --deploy
flag when creating the stream, as follows:
dataflow:> stream create --definition "time | log" --name ticktock --deploy
However, it is not common in real-world use cases to create and deploy the stream in one step.
The reason is that when you use the stream deploy
command, you can pass in properties that define how to map the applications onto the platform (for example, what is the memory size of the container to use, the number of each application to run, and whether to enable data partitioning features).
Properties can also override application properties that were set when creating the stream.
The next sections cover this feature in detail.
18.3.1. Deployment Properties
When deploying a stream, you can specify properties that can control how applications are deployed and configured. See the Deployment Properties section of the microsite for more information.
18.4. Destroying a Stream
You can delete a stream by issuing the stream destroy
command from the shell, as follows:
dataflow:> stream destroy --name ticktock
If the stream was deployed, it is undeployed before the stream definition is deleted.
18.5. Undeploying a Stream
Often, you want to stop a stream but retain the name and definition for future use. In that case, you can undeploy
the stream by name:
dataflow:> stream undeploy --name ticktock
dataflow:> stream deploy --name ticktock
You can issue the deploy
command at a later time to restart it:
dataflow:> stream deploy --name ticktock
18.6. Validating a Stream
Sometimes, an application contained within a stream definition contains an invalid URI in its registration.
This can caused by an invalid URI being entered at application registration time or by the application being removed from the repository from which it was to be drawn.
To verify that all the applications contained in a stream are resolve-able, a user can use the validate
command:
dataflow:>stream validate ticktock
╔═══════════╤═════════════════╗
║Stream Name│Stream Definition║
╠═══════════╪═════════════════╣
║ticktock │time | log ║
╚═══════════╧═════════════════╝
ticktock is a valid stream.
╔═══════════╤═════════════════╗
║ App Name │Validation Status║
╠═══════════╪═════════════════╣
║source:time│valid ║
║sink:log │valid ║
╚═══════════╧═════════════════╝
In the preceding example, the user validated their ticktock stream. Both the source:time
and sink:log
are valid.
Now we can see what happens if we have a stream definition with a registered application with an invalid URI:
dataflow:>stream validate bad-ticktock
╔════════════╤═════════════════╗
║Stream Name │Stream Definition║
╠════════════╪═════════════════╣
║bad-ticktock│bad-time | log ║
╚════════════╧═════════════════╝
bad-ticktock is an invalid stream.
╔═══════════════╤═════════════════╗
║ App Name │Validation Status║
╠═══════════════╪═════════════════╣
║source:bad-time│invalid ║
║sink:log │valid ║
╚═══════════════╧═════════════════╝
In this case, Spring Cloud Data Flow states that the stream is invalid because source:bad-time
has an invalid URI.
18.7. Updating a Stream
To update the stream, use the stream update
command, which takes either --properties
or --propertiesFile
as a command argument.
Skipper has an important new top-level prefix: version
.
The following commands deploy http | log
stream (and the version of log
which registered at the time of deployment was 1.1.0.RELEASE
):
dataflow:> stream create --name httptest --definition "http --server.port=9000 | log"
dataflow:> stream deploy --name httptest
dataflow:>stream info httptest
╔══════════════════════════════╤══════════════════════════════╤════════════════════════════╗
║ Name │ DSL │ Status ║
╠══════════════════════════════╪══════════════════════════════╪════════════════════════════╣
║httptest │http --server.port=9000 | log │deploying ║
╚══════════════════════════════╧══════════════════════════════╧════════════════════════════╝
Stream Deployment properties: {
"log" : {
"spring.cloud.deployer.indexed" : "true",
"spring.cloud.deployer.group" : "httptest",
"maven://org.springframework.cloud.stream.app:log-sink-rabbit" : "1.1.0.RELEASE"
},
"http" : {
"spring.cloud.deployer.group" : "httptest",
"maven://org.springframework.cloud.stream.app:http-source-rabbit" : "1.1.0.RELEASE"
}
}
Then the following command updates the stream to use the 1.2.0.RELEASE
version of the log application.
Before updating the stream with the specific version of the application, we need to make sure that the application is registered with that version:
dataflow:>app register --name log --type sink --uri maven://org.springframework.cloud.stream.app:log-sink-rabbit:1.2.0.RELEASE
Successfully registered application 'sink:log'
Then we can update the application:
dataflow:>stream update --name httptest --properties version.log=1.2.0.RELEASE
You can use only pre-registered application versions to deploy , update , or rollback a stream.
|
To verify the deployment properties and the updated version, we can use stream info
, as shown (with its output) in the following example:
dataflow:>stream info httptest
╔══════════════════════════════╤══════════════════════════════╤════════════════════════════╗
║ Name │ DSL │ Status ║
╠══════════════════════════════╪══════════════════════════════╪════════════════════════════╣
║httptest │http --server.port=9000 | log │deploying ║
╚══════════════════════════════╧══════════════════════════════╧════════════════════════════╝
Stream Deployment properties: {
"log" : {
"spring.cloud.deployer.indexed" : "true",
"spring.cloud.deployer.count" : "1",
"spring.cloud.deployer.group" : "httptest",
"maven://org.springframework.cloud.stream.app:log-sink-rabbit" : "1.2.0.RELEASE"
},
"http" : {
"spring.cloud.deployer.group" : "httptest",
"maven://org.springframework.cloud.stream.app:http-source-rabbit" : "1.1.0.RELEASE"
}
}
18.8. Forcing an Update of a Stream
When upgrading a stream, you can use the --force
option to deploy new instances of currently deployed applications even if no application or deployment properties have changed.
This behavior is needed for when configuration information is obtained by the application itself at startup time — for example, from Spring Cloud Config Server.
You can specify the applications for which to force an upgrade by using the --app-names
option.
If you do not specify any application names, all the applications are forced to upgrade.
You can specify the --force
and --app-names
options together with the --properties
or --propertiesFile
options.
18.9. Stream Versions
Skipper keeps a history of the streams that were deployed.
After updating a Stream, there is a second version of the stream.
You can query for the history of the versions by using the stream history --name <name-of-stream>
command:
dataflow:>stream history --name httptest
╔═══════╤════════════════════════════╤════════╤════════════╤═══════════════╤════════════════╗
║Version│ Last updated │ Status │Package Name│Package Version│ Description ║
╠═══════╪════════════════════════════╪════════╪════════════╪═══════════════╪════════════════╣
║2 │Mon Nov 27 22:41:16 EST 2017│DEPLOYED│httptest │1.0.0 │Upgrade complete║
║1 │Mon Nov 27 22:40:41 EST 2017│DELETED │httptest │1.0.0 │Delete complete ║
╚═══════╧════════════════════════════╧════════╧════════════╧═══════════════╧════════════════╝
18.10. Stream Manifests
Skipper keeps a “manifest” of the all of the applications, their application properties, and their deployment properties after all values have been substituted. This represents the final state of what was deployed to the platform. You can view the manifest for any of the versions of a Stream by using the following command:
stream manifest --name <name-of-stream> --releaseVersion <optional-version>
If the --releaseVersion
is not specified, the manifest for the last version is returned.
The following example shows the use of the manifest:
dataflow:>stream manifest --name httptest
Using the command results in the following output:
# Source: log.yml
apiVersion: skipper.spring.io/v1
kind: SpringCloudDeployerApplication
metadata:
name: log
spec:
resource: maven://org.springframework.cloud.stream.app:log-sink-rabbit
version: 1.2.0.RELEASE
applicationProperties:
spring.metrics.export.triggers.application.includes: integration**
spring.cloud.dataflow.stream.app.label: log
spring.cloud.stream.metrics.key: httptest.log.${spring.cloud.application.guid}
spring.cloud.stream.bindings.input.group: httptest
spring.cloud.stream.metrics.properties: spring.application.name,spring.application.index,spring.cloud.application.*,spring.cloud.dataflow.*
spring.cloud.dataflow.stream.name: httptest
spring.cloud.dataflow.stream.app.type: sink
spring.cloud.stream.bindings.input.destination: httptest.http
deploymentProperties:
spring.cloud.deployer.indexed: true
spring.cloud.deployer.group: httptest
spring.cloud.deployer.count: 1
---
# Source: http.yml
apiVersion: skipper.spring.io/v1
kind: SpringCloudDeployerApplication
metadata:
name: http
spec:
resource: maven://org.springframework.cloud.stream.app:http-source-rabbit
version: 1.2.0.RELEASE
applicationProperties:
spring.metrics.export.triggers.application.includes: integration**
spring.cloud.dataflow.stream.app.label: http
spring.cloud.stream.metrics.key: httptest.http.${spring.cloud.application.guid}
spring.cloud.stream.bindings.output.producer.requiredGroups: httptest
spring.cloud.stream.metrics.properties: spring.application.name,spring.application.index,spring.cloud.application.*,spring.cloud.dataflow.*
server.port: 9000
spring.cloud.stream.bindings.output.destination: httptest.http
spring.cloud.dataflow.stream.name: httptest
spring.cloud.dataflow.stream.app.type: source
deploymentProperties:
spring.cloud.deployer.group: httptest
The majority of the deployment and application properties were set by Data Flow to enable the applications to talk to each other and to send application metrics with identifying labels.
18.11. Rollback a Stream
You can roll back to a previous version of the stream by using the stream rollback
command:
dataflow:>stream rollback --name httptest
The optional --releaseVersion
command argument adds the version of the stream.
If not specified, the rollback operation goes to the previous stream version.
18.12. Application Count
The application count is a dynamic property of the system used to specify the number of instances of applications. See the Application Count section of the microsite for more information.
18.13. Skipper’s Upgrade Strategy
Skipper has a simple “red/black” upgrade strategy. It deploys the new version of the applications, using as many instances as the currently running version, and checks the /health
endpoint of the application.
If the health of the new application is good, the previous application is undeployed.
If the health of the new application is bad, all new applications are undeployed, and the upgrade is considered to be not successful.
The upgrade strategy is not a rolling upgrade, so, if five instances of the application are running, then, in a sunny-day scenario, five of the new applications are also running before the older version is undeployed.
19. Stream DSL
This section covers additional features of the Stream DSL not covered in the Stream DSL introduction.
19.1. Tap a Stream
Taps can be created at various producer endpoints in a stream. See the Tapping a Stream section of the microsite for more information.
19.2. Using Labels in a Stream
When a stream is made up of multiple applications with the same name, they must be qualified with labels. See the Labeling Applications section of the microsite for more information.
19.3. Named Destinations
Instead of referencing a source or sink application, you can use a named destination. See the Named Destinations section of the microsite for more information.
19.4. Fan-in and Fan-out
By using named destinations, you can support fan-in and fan-out use cases. See the Fan-in and Fan-out section of the microsite for more information.
20. Stream Java DSL
Instead of using the shell to create and deploy streams, you can use the Java-based DSL provided by the spring-cloud-dataflow-rest-client
module.
See the Java DSL section of the microsite for more information.
21. Stream Applications with Multiple Binder Configurations
In some cases, a stream can have its applications bound to multiple spring cloud stream binders when they are required to connect to different messaging middleware configurations. In those cases, you should make sure the applications are configured appropriately with their binder configurations. For example, a multi-binder transformer that supports both Kafka and Rabbit binders is the processor in the following stream:
http | multibindertransform --expression=payload.toUpperCase() | log
In the preceding example, you would write your own multibindertransform application.
|
In this stream, each application connects to messaging middleware in the following way:
-
The HTTP source sends events to RabbitMQ (
rabbit1
). -
The Multi-Binder Transform processor receives events from RabbitMQ (
rabbit1
) and sends the processed events into Kafka (kafka1
). -
The log sink receives events from Kafka (
kafka1
).
Here, rabbit1
and kafka1
are the binder names given in the Spring Cloud Stream application properties.
Based on this setup, the applications have the following binders in their classpaths with the appropriate configuration:
-
HTTP: Rabbit binder
-
Transform: Both Kafka and Rabbit binders
-
Log: Kafka binder
The spring-cloud-stream
binder
configuration properties can be set within the applications themselves.
If not, they can be passed through deployment
properties when the stream is deployed:
dataflow:>stream create --definition "http | multibindertransform --expression=payload.toUpperCase() | log" --name mystream
dataflow:>stream deploy mystream --properties "app.http.spring.cloud.stream.bindings.output.binder=rabbit1,app.multibindertransform.spring.cloud.stream.bindings.input.binder=rabbit1,
app.multibindertransform.spring.cloud.stream.bindings.output.binder=kafka1,app.log.spring.cloud.stream.bindings.input.binder=kafka1"
You can override any of the binder configuration properties by specifying them through deployment properties.
22. Function Composition
Function composition lets you attach a functional logic dynamically to an existing event streaming application. See the Function Composition section of the microsite for more details.
23. Functional Applications
With Spring Cloud Stream 3.x adding functional support, you can build Source
, Sink
and Processor
applications merely by implementing the Java Util’s Supplier
, Consumer
, and Function
interfaces respectively.
See the Functional Application Recipe of the SCDF site for more about this feature.
24. Examples
This chapter includes the following examples:
You can find links to more samples in the “[dataflow-samples]” chapter.
24.1. Simple Stream Processing
As an example of a simple processing step, we can transform the payload of the HTTP-posted data to upper case by using the following stream definition:
http | transform --expression=payload.toUpperCase() | log
To create this stream, enter the following command in the shell:
dataflow:> stream create --definition "http --server.port=9000 | transform --expression=payload.toUpperCase() | log" --name mystream --deploy
The following example uses a shell command to post some data:
dataflow:> http post --target http://localhost:9000 --data "hello"
The preceding example results in an upper-case HELLO
in the log, as follows:
2016-06-01 09:54:37.749 INFO 80083 --- [ kafka-binder-] log.sink : HELLO
24.2. Stateful Stream Processing
To demonstrate the data partitioning functionality, the following listing deploys a stream with Kafka as the binder:
dataflow:>stream create --name words --definition "http --server.port=9900 | splitter --expression=payload.split(' ') | log"
Created new stream 'words'
dataflow:>stream deploy words --properties "app.splitter.producer.partitionKeyExpression=payload,deployer.log.count=2"
Deployed stream 'words'
dataflow:>http post --target http://localhost:9900 --data "How much wood would a woodchuck chuck if a woodchuck could chuck wood"
> POST (text/plain;Charset=UTF-8) http://localhost:9900 How much wood would a woodchuck chuck if a woodchuck could chuck wood
> 202 ACCEPTED
dataflow:>runtime apps
╔════════════════════╤═══════════╤═══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╗
║App Id / Instance Id│Unit Status│ No. of Instances / Attributes ║
╠════════════════════╪═══════════╪═══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╣
║words.log-v1 │ deployed │ 2 ║
╟┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈╢
║ │ │ guid = 24166 ║
║ │ │ pid = 33097 ║
║ │ │ port = 24166 ║
║words.log-v1-0 │ deployed │ stderr = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461063/words.log-v1/stderr_0.log ║
║ │ │ stdout = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461063/words.log-v1/stdout_0.log ║
║ │ │ url = https://192.168.0.102:24166 ║
║ │ │working.dir = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461063/words.log-v1 ║
╟┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈╢
║ │ │ guid = 41269 ║
║ │ │ pid = 33098 ║
║ │ │ port = 41269 ║
║words.log-v1-1 │ deployed │ stderr = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461063/words.log-v1/stderr_1.log ║
║ │ │ stdout = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461063/words.log-v1/stdout_1.log ║
║ │ │ url = https://192.168.0.102:41269 ║
║ │ │working.dir = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461063/words.log-v1 ║
╟────────────────────┼───────────┼───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╢
║words.http-v1 │ deployed │ 1 ║
╟┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈╢
║ │ │ guid = 9900 ║
║ │ │ pid = 33094 ║
║ │ │ port = 9900 ║
║words.http-v1-0 │ deployed │ stderr = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461054/words.http-v1/stderr_0.log ║
║ │ │ stdout = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461054/words.http-v1/stdout_0.log ║
║ │ │ url = https://192.168.0.102:9900 ║
║ │ │working.dir = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803461054/words.http-v1 ║
╟────────────────────┼───────────┼───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╢
║words.splitter-v1 │ deployed │ 1 ║
╟┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈╢
║ │ │ guid = 33963 ║
║ │ │ pid = 33093 ║
║ │ │ port = 33963 ║
║words.splitter-v1-0 │ deployed │ stderr = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803437542/words.splitter-v1/stderr_0.log║
║ │ │ stdout = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803437542/words.splitter-v1/stdout_0.log║
║ │ │ url = https://192.168.0.102:33963 ║
║ │ │working.dir = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/words-1542803437542/words.splitter-v1 ║
╚════════════════════╧═══════════╧═══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╝
When you review the words.log-v1-0
logs, you should see the following:
2016-06-05 18:35:47.047 INFO 58638 --- [ kafka-binder-] log.sink : How
2016-06-05 18:35:47.066 INFO 58638 --- [ kafka-binder-] log.sink : chuck
2016-06-05 18:35:47.066 INFO 58638 --- [ kafka-binder-] log.sink : chuck
When you review the words.log-v1-1
logs, you should see the following:
2016-06-05 18:35:47.047 INFO 58639 --- [ kafka-binder-] log.sink : much
2016-06-05 18:35:47.066 INFO 58639 --- [ kafka-binder-] log.sink : wood
2016-06-05 18:35:47.066 INFO 58639 --- [ kafka-binder-] log.sink : would
2016-06-05 18:35:47.066 INFO 58639 --- [ kafka-binder-] log.sink : a
2016-06-05 18:35:47.066 INFO 58639 --- [ kafka-binder-] log.sink : woodchuck
2016-06-05 18:35:47.067 INFO 58639 --- [ kafka-binder-] log.sink : if
2016-06-05 18:35:47.067 INFO 58639 --- [ kafka-binder-] log.sink : a
2016-06-05 18:35:47.067 INFO 58639 --- [ kafka-binder-] log.sink : woodchuck
2016-06-05 18:35:47.067 INFO 58639 --- [ kafka-binder-] log.sink : could
2016-06-05 18:35:47.067 INFO 58639 --- [ kafka-binder-] log.sink : wood
This example has shown that payload splits that contain the same word are routed to the same application instance.
24.3. Other Source and Sink Application Types
This example shows something a bit more complicated: swapping out the time
source for something else. Another supported source type is http
, which accepts data for ingestion over HTTP POST requests. Note that the http
source accepts data on a different port from the Data Flow Server (default 8080). By default, the port is randomly assigned.
To create a stream that uses an http
source but still uses the same log
sink, we would change the original command in the Simple Stream Processing example to the following:
dataflow:> stream create --definition "http | log" --name myhttpstream --deploy
Note that, this time, we do not see any other output until we actually post some data (by using a shell command). To see the randomly assigned port on which the http
source is listening, run the following command:
dataflow:>runtime apps
╔══════════════════════╤═══════════╤═════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╗
║ App Id / Instance Id │Unit Status│ No. of Instances / Attributes ║
╠══════════════════════╪═══════════╪═════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╣
║myhttpstream.log-v1 │ deploying │ 1 ║
╟┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈╢
║ │ │ guid = 39628 ║
║ │ │ pid = 34403 ║
║ │ │ port = 39628 ║
║myhttpstream.log-v1-0 │ deploying │ stderr = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/myhttpstream-1542803867070/myhttpstream.log-v1/stderr_0.log ║
║ │ │ stdout = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/myhttpstream-1542803867070/myhttpstream.log-v1/stdout_0.log ║
║ │ │ url = https://192.168.0.102:39628 ║
║ │ │working.dir = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/myhttpstream-1542803867070/myhttpstream.log-v1 ║
╟──────────────────────┼───────────┼─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╢
║myhttpstream.http-v1 │ deploying │ 1 ║
╟┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┼┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈┈╢
║ │ │ guid = 52143 ║
║ │ │ pid = 34401 ║
║ │ │ port = 52143 ║
║myhttpstream.http-v1-0│ deploying │ stderr = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/myhttpstream-1542803866800/myhttpstream.http-v1/stderr_0.log║
║ │ │ stdout = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/myhttpstream-1542803866800/myhttpstream.http-v1/stdout_0.log║
║ │ │ url = https://192.168.0.102:52143 ║
║ │ │working.dir = /var/folders/js/7b_pn0t575l790x7j61slyxc0000gn/T/spring-cloud-deployer-6467595568759190742/myhttpstream-1542803866800/myhttpstream.http-v1 ║
╚══════════════════════╧═══════════╧═════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╝
You should see that the corresponding http
source has a url
property that contains the host and port information on which it is listening. You are now ready to post to that url, as shown in the following example:
dataflow:> http post --target http://localhost:1234 --data "hello"
dataflow:> http post --target http://localhost:1234 --data "goodbye"
The stream then funnels the data from the http
source to the output log implemented by the log
sink, yielding output similar to the following:
2016-06-01 09:50:22.121 INFO 79654 --- [ kafka-binder-] log.sink : hello
2016-06-01 09:50:26.810 INFO 79654 --- [ kafka-binder-] log.sink : goodbye
We could also change the sink implementation. You could pipe the output to a file (file
), to hadoop (hdfs
), or to any of the other sink applications that are available. You can also define your own applications.