Launching Batch Jobs through Messages

When starting batch jobs by using the core Spring Batch API, you basically have two options:spring-doc.cn

  • From the command line, with the CommandLineJobRunnerspring-doc.cn

  • Programmatically, with either JobOperator.start() or JobLauncher.run()spring-doc.cn

For example, you may want to use the CommandLineJobRunner when invoking batch jobs by using a shell script. Alternatively, you can use the JobOperator directly (for example, when using Spring Batch as part of a web application). However, what about more complex use cases? Maybe you need to poll a remote (S)FTP server to retrieve the data for the Batch Job or your application has to support multiple different data sources simultaneously. For example, you may receive data files not only from the web but also from FTP and other sources. Maybe additional transformation of the input files is needed before invoking Spring Batch.spring-doc.cn

Therefore, it would be much more powerful to execute the batch job by using Spring Integration and its numerous adapters. For example, you can use a File Inbound Channel Adapter to monitor a directory in the file-system and start the batch job as soon as the input file arrives. Additionally, you can create Spring Integration flows that use multiple different adapters to easily ingest data for your batch jobs from multiple sources simultaneously by using only configuration. Implementing all these scenarios with Spring Integration is easy, as it allows for decoupled, event-driven execution of the JobLauncher.spring-doc.cn

Spring Batch Integration provides the JobLaunchingMessageHandler class that you can use to launch batch jobs. The input for the JobLaunchingMessageHandler is provided by a Spring Integration message, which has a payload of type JobLaunchRequest. This class is a wrapper around the Job to be launched and around the JobParameters that are necessary to launch the Batch job.spring-doc.cn

The following image shows the typical Spring Integration message flow that is needed to start a Batch job. The EIP (Enterprise Integration Patterns) website provides a full overview of messaging icons and their descriptions.spring-doc.cn

Launch Batch Job
Figure 1. Launch Batch Job

Transforming a File into a JobLaunchRequest

The following example transforms a file into a JobLaunchRequest:spring-doc.cn

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.integration.launch.JobLaunchRequest;
import org.springframework.integration.annotation.Transformer;
import org.springframework.messaging.Message;

import java.io.File;

public class FileMessageToJobRequest {
    private Job job;
    private String fileParameterName;

    public void setFileParameterName(String fileParameterName) {
        this.fileParameterName = fileParameterName;
    }

    public void setJob(Job job) {
        this.job = job;
    }

    @Transformer
    public JobLaunchRequest toRequest(Message<File> message) {
        JobParametersBuilder jobParametersBuilder =
            new JobParametersBuilder();

        jobParametersBuilder.addString(fileParameterName,
            message.getPayload().getAbsolutePath());

        return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
    }
}

The JobExecution Response

When a batch job is being executed, a JobExecution instance is returned. You can use this instance to determine the status of an execution. If a JobExecution is able to be created successfully, it is always returned, regardless of whether or not the actual execution is successful.spring-doc.cn

The exact behavior on how the JobExecution instance is returned depends on the provided TaskExecutor. If a synchronous (single-threaded) TaskExecutor implementation is used, the JobExecution response is returned only after the job completes. When using an asynchronous TaskExecutor, the JobExecution instance is returned immediately. You can then take the id of JobExecution instance (with JobExecution.getJobId()) and query the JobRepository for the job’s updated status using the JobExplorer. For more information, see Querying the Repository.spring-doc.cn

Spring Batch Integration Configuration

Consider a case where someone needs to create a file inbound-channel-adapter to listen for CSV files in the provided directory, hand them off to a transformer (FileMessageToJobRequest), launch the job through the job launching gateway, and log the output of the JobExecution with the logging-channel-adapter.spring-doc.cn

The following example shows how that common case can be configured in Java:spring-doc.cn

Java Configuration
@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
    FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
    fileMessageToJobRequest.setFileParameterName("input.file.name");
    fileMessageToJobRequest.setJob(personJob());
    return fileMessageToJobRequest;
}

@Bean
public JobLaunchingGateway jobLaunchingGateway() {
    TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
    jobLauncher.setJobRepository(jobRepository);
    jobLauncher.setTaskExecutor(new SyncTaskExecutor());
    JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher);

    return jobLaunchingGateway;
}

@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) {
    return IntegrationFlow.from(Files.inboundAdapter(new File("/tmp/myfiles")).
                    filter(new SimplePatternFileListFilter("*.csv")),
            c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1))).
            transform(fileMessageToJobRequest()).
            handle(jobLaunchingGateway).
            log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").
            get();
}

The following example shows how that common case can be configured in XML:spring-doc.cn

XML Configuration
<int:channel id="inboundFileChannel"/>
<int:channel id="outboundJobRequestChannel"/>
<int:channel id="jobLaunchReplyChannel"/>

<int-file:inbound-channel-adapter id="filePoller"
    channel="inboundFileChannel"
    directory="file:/tmp/myfiles/"
    filename-pattern="*.csv">
  <int:poller fixed-rate="1000"/>
</int-file:inbound-channel-adapter>

<int:transformer input-channel="inboundFileChannel"
    output-channel="outboundJobRequestChannel">
  <bean class="io.spring.sbi.FileMessageToJobRequest">
    <property name="job" ref="personJob"/>
    <property name="fileParameterName" value="input.file.name"/>
  </bean>
</int:transformer>

<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
    reply-channel="jobLaunchReplyChannel"/>

<int:logging-channel-adapter channel="jobLaunchReplyChannel"/>

Example ItemReader Configuration

Now that we are polling for files and launching jobs, we need to configure our Spring Batch ItemReader (for example) to use the files found at the location defined by the job parameter called "input.file.name", as the following bean configuration shows:spring-doc.cn

The following Java example shows the necessary bean configuration:spring-doc.cn

Java Configuration
@Bean
@StepScope
public ItemReader sampleReader(@Value("#{jobParameters[input.file.name]}") String resource) {
...
    FlatFileItemReader flatFileItemReader = new FlatFileItemReader();
    flatFileItemReader.setResource(new FileSystemResource(resource));
...
    return flatFileItemReader;
}

The following XML example shows the necessary bean configuration:spring-doc.cn

XML Configuration
<bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader"
    scope="step">
  <property name="resource" value="file://#{jobParameters['input.file.name']}"/>
    ...
</bean>

The main points of interest in the preceding example are injecting the value of #{jobParameters['input.file.name']} as the Resource property value and setting the ItemReader bean to have step scope. Setting the bean to have step scope takes advantage of the late binding support, which allows access to the jobParameters variable.spring-doc.cn