6. Spring Cloud Stream Integration

A task by itself can be useful, but integration of a task into a larger ecosystem lets it be useful for more complex processing and orchestration. This section covers the integration options for Spring Cloud Task with Spring Cloud Stream.spring-doc.cn

6.1. Launching a Task from a Spring Cloud Stream

You can launch tasks from a stream. To do so, create a sink that listens for a message that contains a TaskLaunchRequest as its payload. The TaskLaunchRequest contains:spring-doc.cn

  • uri: To the task artifact that is to be executed.spring-doc.cn

  • applicationName: The name that is associated with the task. If no applicationName is set, the TaskLaunchRequest generates a task name comprised of the following: Task-<UUID>.spring-doc.cn

  • commandLineArguments: A list containing the command line arguments for the task.spring-doc.cn

  • environmentProperties: A map containing the environment variables to be used by the task.spring-doc.cn

  • deploymentProperties: A map containing the properties that are used by the deployer to deploy the task.spring-doc.cn

If the payload is of a different type, the sink throws an exception.

For example, a stream can be created that has a processor that takes in data from an HTTP source and creates a GenericMessage that contains the TaskLaunchRequest and sends the message to its output channel. The task sink would then receive the message from its input channnel and then launch the task.spring-doc.cn

To create a taskSink, you need only create a Spring Boot application that includes the EnableTaskLauncher annotation, as shown in the following example:spring-doc.cn

@SpringBootApplication
@EnableTaskLauncher
public class TaskSinkApplication {
    public static void main(String[] args) {
        SpringApplication.run(TaskSinkApplication.class, args);
    }
}

The samples module of the Spring Cloud Task project contains a sample Sink and Processor. To install these samples into your local maven repository, run a maven build from the spring-cloud-task-samples directory with the skipInstall property set to false, as shown in the following example:spring-doc.cn

mvn clean installspring-doc.cn

The maven.remoteRepositories.springRepo.url property must be set to the location of the remote repository in which the über-jar is located. If not set, there is no remote repository, so it relies upon the local repository only.

6.1.1. Spring Cloud Data Flow

To create a stream in Spring Cloud Data Flow, you must first register the Task Sink Application we created. In the following example, we are registering the Processor and Sink sample applications by using the Spring Cloud Data Flow shell:spring-doc.cn

app register --name taskSink --type sink --uri maven://io.spring.cloud:tasksink:<version>
app register --name taskProcessor --type processor --uri maven:io.spring.cloud:taskprocessor:<version>

The following example shows how to create a stream from the Spring Cloud Data Flow shell:spring-doc.cn

stream create foo --definition "http --server.port=9000|taskProcessor|taskSink" --deploy

6.2. Spring Cloud Task Events

Spring Cloud Task provides the ability to emit events through a Spring Cloud Stream channel when the task is run through a Spring Cloud Stream channel. A task listener is used to publish the TaskExecution on a message channel named task-events. This feature is autowired into any task that has spring-cloud-stream, spring-cloud-stream-<binder>, and a defined task on its classpath.spring-doc.cn

To disable the event emitting listener, set the spring.cloud.task.events.enabled property to false.

With the appropriate classpath defined, the following task emits the TaskExecution as an event on the task-events channel (at both the start and the end of the task):spring-doc.cn

@SpringBootApplication
public class TaskEventsApplication {

    public static void main(String[] args) {
        SpringApplication.run(TaskEventsApplication.class, args);
    }

    @Configuration
    public static class TaskConfiguration {

        @Bean
        public CommandLineRunner commandLineRunner() {
            return new CommandLineRunner() {
                @Override
                public void run(String... args) throws Exception {
                    System.out.println("The CommandLineRunner was executed");
                }
            };
        }
    }
}
A binder implementation is also required to be on the classpath.
A sample task event application can be found in the samples module of the Spring Cloud Task Project, here.

6.2.1. Disabling Specific Task Events

To disable task events, you can set the spring.cloud.task.events.enabled property to false.spring-doc.cn

6.3. Spring Batch Events

When executing a Spring Batch job through a task, Spring Cloud Task can be configured to emit informational messages based on the Spring Batch listeners available in Spring Batch. Specifically, the following Spring Batch listeners are autoconfigured into each batch job and emit messages on the associated Spring Cloud Stream channels when run through Spring Cloud Task:spring-doc.cn

These listeners are autoconfigured into any AbstractJob when the appropriate beans (a Job and a TaskLifecycleListener) exist in the context. Configuration to listen to these events is handled the same way binding to any other Spring Cloud Stream channel is done. Our task (the one running the batch job) serves as a Source, with the listening applications serving as either a Processor or a Sink.spring-doc.cn

An example could be to have an application listening to the job-execution-events channel for the start and stop of a job. To configure the listening application, you would configure the input to be job-execution-events as follows:spring-doc.cn

spring.cloud.stream.bindings.input.destination=job-execution-eventsspring-doc.cn

A binder implementation is also required to be on the classpath.
A sample batch event application can be found in the samples module of the Spring Cloud Task Project, here.

6.3.1. Sending Batch Events to Different Channels

One of the options that Spring Cloud Task offers for batch events is the ability to alter the channel to which a specific listener can emit its messages. To do so, use the following configuration: spring.cloud.stream.bindings.<the channel>.destination=<new destination>. For example, if StepExecutionListener needs to emit its messages to another channel called my-step-execution-events instead of the default step-execution-events, you can add the following configuration:spring-doc.cn

spring.cloud.stream.bindings.step-execution-events.destination=my-step-execution-eventsspring-doc.cn

6.3.2. Disabling Batch Events

To disable the listener functionality for all batch events, use the following configuration:spring-doc.cn

spring.cloud.task.batch.events.enabled=falsespring-doc.cn

To disable a specific batch event, use the following configuration:spring-doc.cn

spring.cloud.task.batch.events.<batch event listener>.enabled=false:spring-doc.cn

The following listing shows individual listeners that you can disable:spring-doc.cn

spring.cloud.task.batch.events.job-execution.enabled=false
spring.cloud.task.batch.events.step-execution.enabled=false
spring.cloud.task.batch.events.chunk.enabled=false
spring.cloud.task.batch.events.item-read.enabled=false
spring.cloud.task.batch.events.item-process.enabled=false
spring.cloud.task.batch.events.item-write.enabled=false
spring.cloud.task.batch.events.skip.enabled=false

6.3.3. Emit Order for Batch Events

By default, batch events have Ordered.LOWEST_PRECEDENCE. To change this value (for example, to 5 ), use the following configuration:spring-doc.cn

spring.cloud.task.batch.events.job-execution-order=5
spring.cloud.task.batch.events.step-execution-order=5
spring.cloud.task.batch.events.chunk-order=5
spring.cloud.task.batch.events.item-read-order=5
spring.cloud.task.batch.events.item-process-order=5
spring.cloud.task.batch.events.item-write-order=5
spring.cloud.task.batch.events.skip-order=5