通过消息启动 Batch 作业

使用核心 Spring Batch API 启动批处理作业时,您可以 基本上有两个选项:spring-doc.cn

  • 在命令行中,使用CommandLineJobRunnerspring-doc.cn

  • 以编程方式,使用 或JobOperator.start()JobLauncher.run()spring-doc.cn

例如,您可能希望在 when incalling batch jobs by 使用 shell 脚本。或者,您可以直接使用 (例如,使用 Spring Batch 作为 Web 应用程序的一部分)。但是,那又如何呢 更复杂的用例?也许您需要轮询远程 (S)FTP server 检索 Batch Job 或应用程序的数据 必须同时支持多个不同的数据源。为 例如,您不仅可以从 Web 接收数据文件,还可以从 FTP 和其他来源。也许输入文件的其他转换是 在调用 Spring Batch 之前需要。CommandLineJobRunnerJobOperatorspring-doc.cn

因此,执行批处理作业会更强大 通过使用 Spring Integration 及其众多适配器。例如 您可以使用 File Inbound Channel Adapter 来 监视文件系统中的目录,并以 一旦输入文件到达。此外,您还可以创建 Spring 使用多个不同适配器的集成流 从多个来源摄取批处理作业的数据 同时仅使用配置。实现所有这些 Spring Integration 的场景很容易,因为它允许 解耦、事件驱动的 .JobLauncherspring-doc.cn

Spring Batch 集成提供了您可以 用于启动批处理作业。的输入由 Spring Integration 消息,该消息具有 类型为 的有效负载。这个类是 to be launch 和 that are 的包装器 启动 Batch 作业所必需的。JobLaunchingMessageHandlerJobLaunchingMessageHandlerJobLaunchRequestJobJobParametersspring-doc.cn

下图显示了典型的 Spring 集成 启动 Batch 作业所需的消息流。EIP (Enterprise Integration Patterns) 网站提供了消息传送图标及其描述的完整概述。spring-doc.cn

启动 Batch Job
图 1.启动 Batch Job

将文件转换为 JobLaunchRequest

以下示例将文件转换为 :JobLaunchRequestspring-doc.cn

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.integration.launch.JobLaunchRequest;
import org.springframework.integration.annotation.Transformer;
import org.springframework.messaging.Message;

import java.io.File;

public class FileMessageToJobRequest {
    private Job job;
    private String fileParameterName;

    public void setFileParameterName(String fileParameterName) {
        this.fileParameterName = fileParameterName;
    }

    public void setJob(Job job) {
        this.job = job;
    }

    @Transformer
    public JobLaunchRequest toRequest(Message<File> message) {
        JobParametersBuilder jobParametersBuilder =
            new JobParametersBuilder();

        jobParametersBuilder.addString(fileParameterName,
            message.getPayload().getAbsolutePath());

        return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
    }
}

JobExecution 响应

执行批处理作业时,将返回一个实例。您可以使用这个 instance 来确定执行的状态。如果 a 可以创建 successful,则始终返回它,无论 或者实际执行不成功。JobExecutionJobExecutionspring-doc.cn

如何返回实例的确切行为取决于提供的 .如果使用(单线程)实现,则仅在作业完成时返回响应。使用 时,将返回实例 马上。然后,您可以获取 of 实例 (使用 ) 并查询作业的更新状态 使用 .了解更多 信息,请参阅 查询存储库JobExecutionTaskExecutorsynchronousTaskExecutorJobExecutionafterasynchronousTaskExecutorJobExecutionidJobExecutionJobExecution.getJobId()JobRepositoryJobExplorerspring-doc.cn

Spring Batch 集成配置

考虑某人需要创建一个文件来侦听的情况 对于所提供目录中的 CSV 文件,请将它们交给转换器 (),通过任务启动网关启动任务,以及 使用 记录 的输出。inbound-channel-adapterFileMessageToJobRequestJobExecutionlogging-channel-adapterspring-doc.cn

以下示例显示了如何在 Java 中配置该常见情况:spring-doc.cn

Java 配置
@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
    FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
    fileMessageToJobRequest.setFileParameterName("input.file.name");
    fileMessageToJobRequest.setJob(personJob());
    return fileMessageToJobRequest;
}

@Bean
public JobLaunchingGateway jobLaunchingGateway() {
    TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
    jobLauncher.setJobRepository(jobRepository);
    jobLauncher.setTaskExecutor(new SyncTaskExecutor());
    JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher);

    return jobLaunchingGateway;
}

@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) {
    return IntegrationFlow.from(Files.inboundAdapter(new File("/tmp/myfiles")).
                    filter(new SimplePatternFileListFilter("*.csv")),
            c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1))).
            transform(fileMessageToJobRequest()).
            handle(jobLaunchingGateway).
            log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").
            get();
}

以下示例显示了如何在 XML 中配置该常见情况:spring-doc.cn

XML 配置
<int:channel id="inboundFileChannel"/>
<int:channel id="outboundJobRequestChannel"/>
<int:channel id="jobLaunchReplyChannel"/>

<int-file:inbound-channel-adapter id="filePoller"
    channel="inboundFileChannel"
    directory="file:/tmp/myfiles/"
    filename-pattern="*.csv">
  <int:poller fixed-rate="1000"/>
</int-file:inbound-channel-adapter>

<int:transformer input-channel="inboundFileChannel"
    output-channel="outboundJobRequestChannel">
  <bean class="io.spring.sbi.FileMessageToJobRequest">
    <property name="job" ref="personJob"/>
    <property name="fileParameterName" value="input.file.name"/>
  </bean>
</int:transformer>

<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
    reply-channel="jobLaunchReplyChannel"/>

<int:logging-channel-adapter channel="jobLaunchReplyChannel"/>

ItemReader 配置示例

现在我们正在轮询文件并启动作业,我们需要配置我们的 Spring 批处理(例如),以使用在作业定义的位置找到的文件 参数命名为 “input.file.name”,如下面的 bean 配置所示:ItemReaderspring-doc.cn

下面的 Java 示例显示了必要的 Bean 配置:spring-doc.cn

Java 配置
@Bean
@StepScope
public ItemReader sampleReader(@Value("#{jobParameters[input.file.name]}") String resource) {
...
    FlatFileItemReader flatFileItemReader = new FlatFileItemReader();
    flatFileItemReader.setResource(new FileSystemResource(resource));
...
    return flatFileItemReader;
}

下面的 XML 示例显示了必要的 Bean 配置:spring-doc.cn

XML 配置
<bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader"
    scope="step">
  <property name="resource" value="file://#{jobParameters['input.file.name']}"/>
    ...
</bean>

前面示例中的要点是将 的值注入 作为 Resource 属性值,并设置 bean 具有 STEP 范围。将 bean 设置为具有 step scope 可利用 后期绑定支持,允许访问变量。#{jobParameters['input.file.name']}ItemReaderjobParametersspring-doc.cn