6. Spring Cloud Stream 集成
任务本身可能很有用,但将任务集成到更大的生态系统中可以让它 对于更复杂的处理和编排很有用。本节 介绍了 Spring Cloud Task 与 Spring Cloud Stream 的集成选项。
6.1. 从 Spring Cloud Stream 启动 Task
您可以从流启动任务。为此,请创建一个侦听消息的 sink
,其中包含 a 作为其有效负载。包含:TaskLaunchRequest
TaskLaunchRequest
-
uri
:到要执行的任务工件。 -
applicationName
:与任务关联的名称。如果没有 applicationName 时,会生成一个任务名称 由以下内容组成:.TaskLaunchRequest
Task-<UUID>
-
commandLineArguments
:包含任务的命令行参数的列表。 -
environmentProperties
:包含要使用的环境变量的映射 任务。 -
deploymentProperties
:包含部署程序用于 部署任务。
如果有效负载是不同的类型,则 sink 会引发异常。 |
例如,可以创建一个流,该流具有一个处理器,该处理器从
HTTP 源并创建一个包含 和 发送
将消息发送到其 output 通道。然后,任务接收器将从其
输入 channnel,然后启动任务。GenericMessage
TaskLaunchRequest
要创建taskSink,你只需要创建一个包含注释的 Spring Boot 应用程序,如以下示例所示:EnableTaskLauncher
@SpringBootApplication
@EnableTaskLauncher
public class TaskSinkApplication {
public static void main(String[] args) {
SpringApplication.run(TaskSinkApplication.class, args);
}
}
样本
模块中包含一个示例 Sink 和 Processor。安装
这些示例复制到本地 Maven 存储库中,从属性设置为 的目录运行 Maven 构建,作为
如以下示例所示:spring-cloud-task-samples
skipInstall
false
mvn clean install
该属性必须设置为位置
的 Über-jar 所在的远程仓库。如果未设置,则没有远程
存储库,因此它仅依赖于本地存储库。maven.remoteRepositories.springRepo.url |
6.1.1. Spring Cloud 数据流
要在 Spring Cloud Data Flow 中创建流,必须先注册 Task Sink 我们创建的应用程序。在以下示例中,我们将注册 Processor 和 使用 Spring Cloud Data Flow shell 接收示例应用程序:
app register --name taskSink --type sink --uri maven://io.spring.cloud:tasksink:<version>
app register --name taskProcessor --type processor --uri maven:io.spring.cloud:taskprocessor:<version>
以下示例展示了如何从 Spring Cloud Data Flow shell 创建流:
stream create foo --definition "http --server.port=9000|taskProcessor|taskSink" --deploy
6.2. Spring Cloud 任务事件
Spring Cloud Task 提供了通过 Spring Cloud Stream 发出事件的能力
channel。任务侦听器是
用于在名为 的消息渠道上发布 。此功能
自动连接到具有 、 、
以及其 Classpath 上的已定义任务。TaskExecution
task-events
spring-cloud-stream
spring-cloud-stream-<binder>
要禁用事件发出侦听器,请将该属性设置为 。spring.cloud.task.events.enabled false |
定义适当的类路径后,以下任务将
事件(在任务的开始和结束时):TaskExecution
task-events
@SpringBootApplication
public class TaskEventsApplication {
public static void main(String[] args) {
SpringApplication.run(TaskEventsApplication.class, args);
}
@Configuration
public static class TaskConfiguration {
@Bean
public CommandLineRunner commandLineRunner() {
return new CommandLineRunner() {
@Override
public void run(String... args) throws Exception {
System.out.println("The CommandLineRunner was executed");
}
};
}
}
}
还需要 Binder 实现位于 Classpath 上。 |
可以在 samples 模块中找到一个示例任务事件应用程序 Spring Cloud Task Project 的 Spring Cloud 任务项目,这里。 |
6.2.1. 禁用特定任务事件
要禁用任务事件,可以将该属性设置为 。spring.cloud.task.events.enabled
false
6.3. Spring Batch 事件
通过任务执行 Spring Batch 作业时,可以将 Spring Cloud 任务配置为 根据 Spring Batch 中提供的 Spring Batch 侦听器发出信息性消息。 具体来说,以下 Spring Batch 侦听器会自动配置到每个批处理作业中 并在通过 Spring 运行时在关联的 Spring Cloud Stream 通道上发出消息 云任务:
-
JobExecutionListener
监听job-execution-events
-
StepExecutionListener
监听step-execution-events
-
ChunkListener
监听chunk-events
-
ItemReadListener
监听item-read-events
-
ItemProcessListener
监听item-process-events
-
ItemWriteListener
监听item-write-events
-
SkipListener
监听skip-events
这些侦听器会在适当的时候自动配置为 any
bean (a 和 a ) 存在于上下文中。configuration 设置为
listen to these events 的处理方式与绑定到任何其他 Spring 的方式相同
Cloud Stream 频道已完成。我们的任务(运行批处理作业的任务)充当 ,侦听应用程序充当 a 或 .AbstractJob
Job
TaskLifecycleListener
Source
Processor
Sink
例如,让应用程序侦听通道
用于作业的启动和停止。要配置侦听应用程序,您可以
将 Input (输入) 配置为如下:job-execution-events
job-execution-events
spring.cloud.stream.bindings.input.destination=job-execution-events
还需要 Binder 实现位于 Classpath 上。 |
可以在 samples 模块中找到一个示例批处理事件应用程序 Spring Cloud Task Project 的 Spring Cloud 任务项目,这里。 |
6.3.1. 将 Batch 事件发送到不同的频道
Spring Cloud Task 为批处理事件提供的选项之一是能够更改
特定侦听器可以向其发送消息的通道。为此,请使用
以下配置: 。例如
如果需要将其消息发送到另一个名为 而不是 default 的通道,则可以添加
以下配置:spring.cloud.stream.bindings.<the channel>.destination=<new destination>
StepExecutionListener
my-step-execution-events
step-execution-events
spring.cloud.stream.bindings.step-execution-events.destination=my-step-execution-events
6.3.2. 禁用批处理事件
要禁用所有批处理事件的侦听器功能,请使用以下命令 配置:
spring.cloud.task.batch.events.enabled=false
要禁用特定批处理事件,请使用以下配置:
spring.cloud.task.batch.events.<batch event listener>.enabled=false
:
以下清单显示了您可以禁用的各个侦听器:
spring.cloud.task.batch.events.job-execution.enabled=false
spring.cloud.task.batch.events.step-execution.enabled=false
spring.cloud.task.batch.events.chunk.enabled=false
spring.cloud.task.batch.events.item-read.enabled=false
spring.cloud.task.batch.events.item-process.enabled=false
spring.cloud.task.batch.events.item-write.enabled=false
spring.cloud.task.batch.events.skip.enabled=false
6.3.3. 批量事件的发出顺序
默认情况下,批处理事件具有 .要更改此值(对于
example,设置为 5 ),使用以下配置:Ordered.LOWEST_PRECEDENCE
spring.cloud.task.batch.events.job-execution-order=5
spring.cloud.task.batch.events.step-execution-order=5
spring.cloud.task.batch.events.chunk-order=5
spring.cloud.task.batch.events.item-read-order=5
spring.cloud.task.batch.events.item-process-order=5
spring.cloud.task.batch.events.item-write-order=5
spring.cloud.task.batch.events.skip-order=5