6. Spring Cloud Stream Integration
A task by itself can be useful, but integration of a task into a larger ecosystem lets it be useful for more complex processing and orchestration. This section covers the integration options for Spring Cloud Task with Spring Cloud Stream.
6.1. Launching a Task from a Spring Cloud Stream
You can launch tasks from a stream. To do so, create a sink that listens for a message
that contains a TaskLaunchRequest
as its payload. The TaskLaunchRequest
contains:
-
uri
: To the task artifact that is to be executed. -
applicationName
: The name that is associated with the task. If no applicationName is set, theTaskLaunchRequest
generates a task name comprised of the following:Task-<UUID>
. -
commandLineArguments
: A list containing the command line arguments for the task. -
environmentProperties
: A map containing the environment variables to be used by the task. -
deploymentProperties
: A map containing the properties that are used by the deployer to deploy the task.
If the payload is of a different type, the sink throws an exception. |
For example, a stream can be created that has a processor that takes in data from an
HTTP source and creates a GenericMessage
that contains the TaskLaunchRequest
and sends
the message to its output channel. The task sink would then receive the message from its
input channnel and then launch the task.
To create a taskSink, you need only create a Spring Boot application that includes the
EnableTaskLauncher
annotation, as shown in the following example:
@SpringBootApplication
@EnableTaskLauncher
public class TaskSinkApplication {
public static void main(String[] args) {
SpringApplication.run(TaskSinkApplication.class, args);
}
}
The samples
module of the Spring Cloud Task project contains a sample Sink and Processor. To install
these samples into your local maven repository, run a maven build from the
spring-cloud-task-samples
directory with the skipInstall
property set to false
, as
shown in the following example:
mvn clean install
The maven.remoteRepositories.springRepo.url property must be set to the location
of the remote repository in which the über-jar is located. If not set, there is no remote
repository, so it relies upon the local repository only.
|
6.1.1. Spring Cloud Data Flow
To create a stream in Spring Cloud Data Flow, you must first register the Task Sink Application we created. In the following example, we are registering the Processor and Sink sample applications by using the Spring Cloud Data Flow shell:
app register --name taskSink --type sink --uri maven://io.spring.cloud:tasksink:<version>
app register --name taskProcessor --type processor --uri maven:io.spring.cloud:taskprocessor:<version>
The following example shows how to create a stream from the Spring Cloud Data Flow shell:
stream create foo --definition "http --server.port=9000|taskProcessor|taskSink" --deploy
6.2. Spring Cloud Task Events
Spring Cloud Task provides the ability to emit events through a Spring Cloud Stream
channel when the task is run through a Spring Cloud Stream channel. A task listener is
used to publish the TaskExecution
on a message channel named task-events
. This feature
is autowired into any task that has spring-cloud-stream
, spring-cloud-stream-<binder>
,
and a defined task on its classpath.
To disable the event emitting listener, set the spring.cloud.task.events.enabled
property to false .
|
With the appropriate classpath defined, the following task emits the TaskExecution
as an
event on the task-events
channel (at both the start and the end of the task):
@SpringBootApplication
public class TaskEventsApplication {
public static void main(String[] args) {
SpringApplication.run(TaskEventsApplication.class, args);
}
@Configuration
public static class TaskConfiguration {
@Bean
public CommandLineRunner commandLineRunner() {
return new CommandLineRunner() {
@Override
public void run(String... args) throws Exception {
System.out.println("The CommandLineRunner was executed");
}
};
}
}
}
A binder implementation is also required to be on the classpath. |
A sample task event application can be found in the samples module of the Spring Cloud Task Project, here. |
6.2.1. Disabling Specific Task Events
To disable task events, you can set the spring.cloud.task.events.enabled
property to
false
.
6.3. Spring Batch Events
When executing a Spring Batch job through a task, Spring Cloud Task can be configured to emit informational messages based on the Spring Batch listeners available in Spring Batch. Specifically, the following Spring Batch listeners are autoconfigured into each batch job and emit messages on the associated Spring Cloud Stream channels when run through Spring Cloud Task:
-
JobExecutionListener
listens forjob-execution-events
-
StepExecutionListener
listens forstep-execution-events
-
ChunkListener
listens forchunk-events
-
ItemReadListener
listens foritem-read-events
-
ItemProcessListener
listens foritem-process-events
-
ItemWriteListener
listens foritem-write-events
-
SkipListener
listens forskip-events
These listeners are autoconfigured into any AbstractJob
when the appropriate
beans (a Job
and a TaskLifecycleListener
) exist in the context. Configuration to
listen to these events is handled the same way binding to any other Spring
Cloud Stream channel is done. Our task (the one running the batch job) serves as a
Source
, with the listening applications serving as either a Processor
or a Sink
.
An example could be to have an application listening to the job-execution-events
channel
for the start and stop of a job. To configure the listening application, you would
configure the input to be job-execution-events
as follows:
spring.cloud.stream.bindings.input.destination=job-execution-events
A binder implementation is also required to be on the classpath. |
A sample batch event application can be found in the samples module of the Spring Cloud Task Project, here. |
6.3.1. Sending Batch Events to Different Channels
One of the options that Spring Cloud Task offers for batch events is the ability to alter
the channel to which a specific listener can emit its messages. To do so, use the
following configuration:
spring.cloud.stream.bindings.<the channel>.destination=<new destination>
. For example,
if StepExecutionListener
needs to emit its messages to another channel called
my-step-execution-events
instead of the default step-execution-events
, you can add the
following configuration:
spring.cloud.stream.bindings.step-execution-events.destination=my-step-execution-events
6.3.2. Disabling Batch Events
To disable the listener functionality for all batch events, use the following configuration:
spring.cloud.task.batch.events.enabled=false
To disable a specific batch event, use the following configuration:
spring.cloud.task.batch.events.<batch event listener>.enabled=false
:
The following listing shows individual listeners that you can disable:
spring.cloud.task.batch.events.job-execution.enabled=false
spring.cloud.task.batch.events.step-execution.enabled=false
spring.cloud.task.batch.events.chunk.enabled=false
spring.cloud.task.batch.events.item-read.enabled=false
spring.cloud.task.batch.events.item-process.enabled=false
spring.cloud.task.batch.events.item-write.enabled=false
spring.cloud.task.batch.events.skip.enabled=false
6.3.3. Emit Order for Batch Events
By default, batch events have Ordered.LOWEST_PRECEDENCE
. To change this value (for
example, to 5 ), use the following configuration:
spring.cloud.task.batch.events.job-execution-order=5
spring.cloud.task.batch.events.step-execution-order=5
spring.cloud.task.batch.events.chunk-order=5
spring.cloud.task.batch.events.item-read-order=5
spring.cloud.task.batch.events.item-process-order=5
spring.cloud.task.batch.events.item-write-order=5
spring.cloud.task.batch.events.skip-order=5