通过消息启动 Batch 作业
使用核心 Spring Batch API 启动批处理作业时,您可以 基本上有两个选项:
-
在命令行中,使用
CommandLineJobRunner
-
以编程方式,使用
JobOperator.start()
或JobLauncher.run()
例如,您可能希望使用CommandLineJobRunner
当通过
使用 shell 脚本。或者,您可以使用JobOperator
直接(例如,使用
Spring Batch 作为 Web 应用程序的一部分)。但是,那又如何呢
更复杂的用例?也许您需要轮询远程 (S)FTP
server 检索 Batch Job 或应用程序的数据
必须同时支持多个不同的数据源。为
例如,您不仅可以从 Web 接收数据文件,还可以从
FTP 和其他来源。也许输入文件的其他转换是
在调用 Spring Batch 之前需要。
因此,执行批处理作业会更强大
通过使用 Spring Integration 及其众多适配器。例如
您可以使用 File Inbound Channel Adapter 来
监视文件系统中的目录,并以
一旦输入文件到达。此外,您还可以创建 Spring
使用多个不同适配器的集成流
从多个来源摄取批处理作业的数据
同时仅使用配置。实现所有这些
Spring Integration 的场景很容易,因为它允许
解耦的、事件驱动的JobLauncher
.
Spring Batch 集成提供了JobLaunchingMessageHandler
类
用于启动批处理作业。的JobLaunchingMessageHandler
由
Spring Integration 消息,其有效负载为JobLaunchRequest
.这个类是Job
启动并围绕JobParameters
他们是
启动 Batch 作业所必需的。
下图显示了典型的 Spring 集成 启动 Batch 作业所需的消息流。EIP (Enterprise Integration Patterns) 网站提供了消息传送图标及其描述的完整概述。

将文件转换为 JobLaunchRequest
以下示例将文件转换为JobLaunchRequest
:
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.integration.launch.JobLaunchRequest;
import org.springframework.integration.annotation.Transformer;
import org.springframework.messaging.Message;
import java.io.File;
public class FileMessageToJobRequest {
private Job job;
private String fileParameterName;
public void setFileParameterName(String fileParameterName) {
this.fileParameterName = fileParameterName;
}
public void setJob(Job job) {
this.job = job;
}
@Transformer
public JobLaunchRequest toRequest(Message<File> message) {
JobParametersBuilder jobParametersBuilder =
new JobParametersBuilder();
jobParametersBuilder.addString(fileParameterName,
message.getPayload().getAbsolutePath());
return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
}
}
JobExecution 响应
执行批处理作业时,JobExecution
instance 返回。您可以使用这个
instance 来确定执行的状态。如果
一个JobExecution
可以创建
successful,则始终返回它,无论
或者实际执行不成功。
有关JobExecution
instance 的TaskExecutor
.如果synchronous
(单线程)TaskExecutor
implementation 时,JobExecution
仅返回 responseafter
作业完成。当使用asynchronous
TaskExecutor
这JobExecution
实例
马上。然后,您可以获取id
之JobExecution
实例
(使用JobExecution.getJobId()
) 并查询JobRepository
了解作业的更新状态
使用JobExplorer
.了解更多
信息,请参阅 查询存储库。
Spring Batch 集成配置
考虑某人需要创建文件的情况inbound-channel-adapter
收听
对于所提供目录中的 CSV 文件,请将它们交给转换器
(FileMessageToJobRequest
),通过任务启动网关启动任务,以及
记录JobExecution
使用logging-channel-adapter
.
-
Java
-
XML
以下示例显示了如何在 Java 中配置该常见情况:
@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
fileMessageToJobRequest.setFileParameterName("input.file.name");
fileMessageToJobRequest.setJob(personJob());
return fileMessageToJobRequest;
}
@Bean
public JobLaunchingGateway jobLaunchingGateway() {
TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(new SyncTaskExecutor());
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher);
return jobLaunchingGateway;
}
@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) {
return IntegrationFlow.from(Files.inboundAdapter(new File("/tmp/myfiles")).
filter(new SimplePatternFileListFilter("*.csv")),
c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1))).
transform(fileMessageToJobRequest()).
handle(jobLaunchingGateway).
log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").
get();
}
以下示例显示了如何在 XML 中配置该常见情况:
<int:channel id="inboundFileChannel"/>
<int:channel id="outboundJobRequestChannel"/>
<int:channel id="jobLaunchReplyChannel"/>
<int-file:inbound-channel-adapter id="filePoller"
channel="inboundFileChannel"
directory="file:/tmp/myfiles/"
filename-pattern="*.csv">
<int:poller fixed-rate="1000"/>
</int-file:inbound-channel-adapter>
<int:transformer input-channel="inboundFileChannel"
output-channel="outboundJobRequestChannel">
<bean class="io.spring.sbi.FileMessageToJobRequest">
<property name="job" ref="personJob"/>
<property name="fileParameterName" value="input.file.name"/>
</bean>
</int:transformer>
<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
reply-channel="jobLaunchReplyChannel"/>
<int:logging-channel-adapter channel="jobLaunchReplyChannel"/>
ItemReader 配置示例
现在我们正在轮询文件并启动作业,我们需要配置我们的 Spring
批ItemReader
(例如)使用在作业定义的位置找到的文件
参数命名为 “input.file.name”,如下面的 bean 配置所示:
-
Java
-
XML
下面的 Java 示例显示了必要的 Bean 配置:
@Bean
@StepScope
public ItemReader sampleReader(@Value("#{jobParameters[input.file.name]}") String resource) {
...
FlatFileItemReader flatFileItemReader = new FlatFileItemReader();
flatFileItemReader.setResource(new FileSystemResource(resource));
...
return flatFileItemReader;
}
下面的 XML 示例显示了必要的 Bean 配置:
<bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader"
scope="step">
<property name="resource" value="file://#{jobParameters['input.file.name']}"/>
...
</bean>
前面示例中的要点是将#{jobParameters['input.file.name']}
作为 Resource 属性值,并将ItemReader
豆
具有 STEP 范围。将 bean 设置为具有 step scope 可利用
后期绑定支持,它允许访问jobParameters
变量。