通过消息启动 Batch 作业

使用核心 Spring Batch API 启动批处理作业时,您可以 基本上有两个选项:spring-doc.cadn.net.cn

例如,您可能希望使用CommandLineJobRunner当通过 使用 shell 脚本。或者,您可以使用JobOperator直接(例如,使用 Spring Batch 作为 Web 应用程序的一部分)。但是,那又如何呢 更复杂的用例?也许您需要轮询远程 (S)FTP server 检索 Batch Job 或应用程序的数据 必须同时支持多个不同的数据源。为 例如,您不仅可以从 Web 接收数据文件,还可以从 FTP 和其他来源。也许输入文件的其他转换是 在调用 Spring Batch 之前需要。spring-doc.cadn.net.cn

因此,执行批处理作业会更强大 通过使用 Spring Integration 及其众多适配器。例如 您可以使用 File Inbound Channel Adapter 来 监视文件系统中的目录,并以 一旦输入文件到达。此外,您还可以创建 Spring 使用多个不同适配器的集成流 从多个来源摄取批处理作业的数据 同时仅使用配置。实现所有这些 Spring Integration 的场景很容易,因为它允许 解耦的、事件驱动的JobLauncher.spring-doc.cadn.net.cn

Spring Batch 集成提供了JobLaunchingMessageHandler类 用于启动批处理作业。的JobLaunchingMessageHandler由 Spring Integration 消息,其有效负载为JobLaunchRequest.这个类是Job启动并围绕JobParameters他们是 启动 Batch 作业所必需的。spring-doc.cadn.net.cn

下图显示了典型的 Spring 集成 启动 Batch 作业所需的消息流。EIP (Enterprise Integration Patterns) 网站提供了消息传送图标及其描述的完整概述。spring-doc.cadn.net.cn

启动 Batch Job
图 1.启动 Batch Job

将文件转换为 JobLaunchRequest

以下示例将文件转换为JobLaunchRequest:spring-doc.cadn.net.cn

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.integration.launch.JobLaunchRequest;
import org.springframework.integration.annotation.Transformer;
import org.springframework.messaging.Message;

import java.io.File;

public class FileMessageToJobRequest {
    private Job job;
    private String fileParameterName;

    public void setFileParameterName(String fileParameterName) {
        this.fileParameterName = fileParameterName;
    }

    public void setJob(Job job) {
        this.job = job;
    }

    @Transformer
    public JobLaunchRequest toRequest(Message<File> message) {
        JobParametersBuilder jobParametersBuilder =
            new JobParametersBuilder();

        jobParametersBuilder.addString(fileParameterName,
            message.getPayload().getAbsolutePath());

        return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
    }
}

JobExecution 响应

执行批处理作业时,JobExecutioninstance 返回。您可以使用这个 instance 来确定执行的状态。如果 一个JobExecution可以创建 successful,则始终返回它,无论 或者实际执行不成功。spring-doc.cadn.net.cn

有关JobExecutioninstance 的TaskExecutor.如果synchronous(单线程)TaskExecutorimplementation 时,JobExecution仅返回 responseafter作业完成。当使用asynchronous TaskExecutorJobExecution实例 马上。然后,您可以获取idJobExecution实例 (使用JobExecution.getJobId()) 并查询JobRepository了解作业的更新状态 使用JobExplorer.了解更多 信息,请参阅 查询存储库spring-doc.cadn.net.cn

Spring Batch 集成配置

考虑某人需要创建文件的情况inbound-channel-adapter收听 对于所提供目录中的 CSV 文件,请将它们交给转换器 (FileMessageToJobRequest),通过任务启动网关启动任务,以及 记录JobExecution使用logging-channel-adapter.spring-doc.cadn.net.cn

以下示例显示了如何在 Java 中配置该常见情况:spring-doc.cadn.net.cn

Java 配置
@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
    FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
    fileMessageToJobRequest.setFileParameterName("input.file.name");
    fileMessageToJobRequest.setJob(personJob());
    return fileMessageToJobRequest;
}

@Bean
public JobLaunchingGateway jobLaunchingGateway() {
    TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
    jobLauncher.setJobRepository(jobRepository);
    jobLauncher.setTaskExecutor(new SyncTaskExecutor());
    JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher);

    return jobLaunchingGateway;
}

@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) {
    return IntegrationFlow.from(Files.inboundAdapter(new File("/tmp/myfiles")).
                    filter(new SimplePatternFileListFilter("*.csv")),
            c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1))).
            transform(fileMessageToJobRequest()).
            handle(jobLaunchingGateway).
            log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").
            get();
}

以下示例显示了如何在 XML 中配置该常见情况:spring-doc.cadn.net.cn

XML 配置
<int:channel id="inboundFileChannel"/>
<int:channel id="outboundJobRequestChannel"/>
<int:channel id="jobLaunchReplyChannel"/>

<int-file:inbound-channel-adapter id="filePoller"
    channel="inboundFileChannel"
    directory="file:/tmp/myfiles/"
    filename-pattern="*.csv">
  <int:poller fixed-rate="1000"/>
</int-file:inbound-channel-adapter>

<int:transformer input-channel="inboundFileChannel"
    output-channel="outboundJobRequestChannel">
  <bean class="io.spring.sbi.FileMessageToJobRequest">
    <property name="job" ref="personJob"/>
    <property name="fileParameterName" value="input.file.name"/>
  </bean>
</int:transformer>

<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
    reply-channel="jobLaunchReplyChannel"/>

<int:logging-channel-adapter channel="jobLaunchReplyChannel"/>

ItemReader 配置示例

现在我们正在轮询文件并启动作业,我们需要配置我们的 Spring 批ItemReader(例如)使用在作业定义的位置找到的文件 参数命名为 “input.file.name”,如下面的 bean 配置所示:spring-doc.cadn.net.cn

下面的 Java 示例显示了必要的 Bean 配置:spring-doc.cadn.net.cn

Java 配置
@Bean
@StepScope
public ItemReader sampleReader(@Value("#{jobParameters[input.file.name]}") String resource) {
...
    FlatFileItemReader flatFileItemReader = new FlatFileItemReader();
    flatFileItemReader.setResource(new FileSystemResource(resource));
...
    return flatFileItemReader;
}

下面的 XML 示例显示了必要的 Bean 配置:spring-doc.cadn.net.cn

XML 配置
<bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader"
    scope="step">
  <property name="resource" value="file://#{jobParameters['input.file.name']}"/>
    ...
</bean>

前面示例中的要点是将#{jobParameters['input.file.name']}作为 Resource 属性值,并将ItemReader豆 具有 STEP 范围。将 bean 设置为具有 step scope 可利用 后期绑定支持,它允许访问jobParameters变量。spring-doc.cadn.net.cn