配置步骤
配置Step
如领域一章所述,一个Step
是一个
domain 对象,该对象封装了批处理作业的独立顺序阶段,以及
包含定义和控制实际批次所需的所有信息
加工。这必然是一个模糊的描述,因为任何给定的内容Step
由编写Job
.一个Step
可以像简单一样简单
或开发人员想要的复杂。一个简单的Step
可能会将数据从文件加载到
数据库,几乎不需要代码(取决于所使用的实现)。一个更多
复杂Step
可能具有复杂的业务规则,这些规则作为
processing,如下图所示:

面向块的处理
Spring Batch 在其最常见的
实现。面向块的处理是指一次读取一个数据,并且
创建在事务边界内写出的“块”。一旦
items read 等于提交间隔,则整个块由ItemWriter
,然后提交事务。下图显示了
过程:

以下伪代码以简化的形式显示相同的概念:
List items = new Arraylist();
for(int i = 0; i < commitInterval; i++){
Object item = itemReader.read();
if (item != null) {
items.add(item);
}
}
itemWriter.write(items);
您还可以使用可选的ItemProcessor
在将项目传递给ItemWriter
.下图
显示当ItemProcessor
在步骤中注册:

以下伪代码显示了如何以简化的形式实现这一点:
List items = new Arraylist();
for(int i = 0; i < commitInterval; i++){
Object item = itemReader.read();
if (item != null) {
items.add(item);
}
}
List processedItems = new Arraylist();
for(Object item: items){
Object processedItem = itemProcessor.process(item);
if (processedItem != null) {
processedItems.add(processedItem);
}
}
itemWriter.write(processedItems);
有关商品处理器及其用例的更多详细信息,请参阅商品处理部分。
配置步骤
尽管所需依赖项列表相对较短,但Step
,它是一个
极其复杂的类,可能包含许多协作者。
为了简化配置,您可以使用 Spring Batch XML 命名空间,作为 以下示例显示:
<job id="sampleJob" job-repository="jobRepository">
<step id="step1">
<tasklet transaction-manager="transactionManager">
<chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
</tasklet>
</step>
</job>
使用 Java 配置时,您可以使用 Spring Batch 构建器,作为 以下示例显示:
/**
* Note the JobRepository is typically autowired in and not needed to be explicitly
* configured
*/
@Bean
public Job sampleJob(JobRepository jobRepository, Step sampleStep) {
return new JobBuilder("sampleJob", jobRepository)
.start(sampleStep)
.build();
}
/**
* Note the TransactionManager is typically autowired in and not needed to be explicitly
* configured
*/
@Bean
public Step sampleStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("sampleStep", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.build();
}
前面的配置包括创建面向项的唯一依赖项 步:
-
reader
:这ItemReader
提供待处理的项目。 -
writer
:这ItemWriter
处理由ItemReader
.
-
transaction-manager
: Spring的PlatformTransactionManager
开始并承诺 处理过程中的交易。
-
transactionManager
: Spring的PlatformTransactionManager
开始并承诺 处理过程中的交易。
-
job-repository
:特定于 XML 的名称JobRepository
定期存储 这StepExecution
和ExecutionContext
在处理期间(就在提交之前)。为 一个串联<step/>
(在<job/>
),它是<job/>
元素。对于独立<step/>
,则将其定义为<tasklet/>
.
-
repository
:特定于 Java 的名称JobRepository
定期存储 这StepExecution
和ExecutionContext
在处理期间(就在提交之前)。
-
commit-interval
:要处理的项目数的 XML 专用名称 在提交事务之前。
-
chunk
:依赖项的特定于 Java 的名称,指示这是 基于物料的步骤和交易之前要处理的物料数 承诺。
请注意job-repository
默认为jobRepository
和transaction-manager
默认为transactionManager
.此外,ItemProcessor
是
可选,因为该项可以直接从读取器传递到写入器。
请注意repository
默认为jobRepository
(通过@EnableBatchProcessing
)
和transactionManager
默认为transactionManager
(从应用程序上下文提供)。
此外,ItemProcessor
是可选的,因为该项目可以是
直接从读者传递给写入器。
从父母那里继承Step
如果一组Steps
共享类似的配置,那么定义一个
“父母”Step
混凝土从中Steps
可以继承属性。与class相似
Java 中的继承,“子”Step
将其元素和属性与
父母的。子项还会覆盖父项的任何Steps
.
在以下示例中,Step
,concreteStep1
,继承自parentStep
.是的
实例化为itemReader
,itemProcessor
,itemWriter
,startLimit=5
和allowStartIfComplete=true
.此外,commitInterval
是5
,因为它是
被concreteStep1
Step
,如以下示例所示:
<step id="parentStep">
<tasklet allow-start-if-complete="true">
<chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
</tasklet>
</step>
<step id="concreteStep1" parent="parentStep">
<tasklet start-limit="5">
<chunk processor="itemProcessor" commit-interval="5"/>
</tasklet>
</step>
这id
属性仍然是作业元素内步骤上所必需的。这是两个人
原因:
-
这
id
在持久化StepExecution
.如果相同的 在作业中的多个步骤中引用独立步骤时,会发生错误。
-
创建作业流时,如本章后面所述,
next
属性 应引用流中的步骤,而不是独立步骤。
抽象Step
有时,可能需要定义父级Step
那不是完整的Step
配置。例如,如果reader
,writer
和tasklet
属性是
离开Step
配置,则初始化失败。如果父级必须是
定义时,如果没有其中一个或多个属性,则abstract
属性。一abstract
Step
只是扩展,从不实例化。
在以下示例中,Step
(abstractParentStep
)如果
没有被宣布为抽象的。这Step
, (concreteStep2
) 有itemReader
,itemWriter
和commit-interval=10
.
<step id="abstractParentStep" abstract="true">
<tasklet>
<chunk commit-interval="10"/>
</tasklet>
</step>
<step id="concreteStep2" parent="abstractParentStep">
<tasklet>
<chunk reader="itemReader" writer="itemWriter"/>
</tasklet>
</step>
合并列表
上的一些可配置元素Steps
是列表,例如<listeners/>
元素。
如果父级和子级Steps
声明一个<listeners/>
元素,则
子项的列表将覆盖父级的列表。允许孩子添加其他
监听器,每个列表元素都有一个merge
属性。
如果元素指定merge="true"
,则子项的列表将与
parent's 而不是覆盖它。
在以下示例中,Step
“concreteStep3”,使用两个监听器创建:listenerOne
和listenerTwo
:
<step id="listenersParentStep" abstract="true">
<listeners>
<listener ref="listenerOne"/>
<listeners>
</step>
<step id="concreteStep3" parent="listenersParentStep">
<tasklet>
<chunk reader="itemReader" writer="itemWriter" commit-interval="5"/>
</tasklet>
<listeners merge="true">
<listener ref="listenerTwo"/>
<listeners>
</step>
提交间隔
如前所述,步骤读入和写出项目,定期提交
通过使用提供的PlatformTransactionManager
.使用commit-interval
的 1、它
在写入每个单独的项目后提交。这在许多情况下不太理想,
因为开始和提交事务的成本很高。理想情况下,最好是
在每笔交易中处理尽可能多的项目,这完全取决于
正在处理的数据类型以及与步骤交互的资源。
因此,您可以配置在提交中处理的项数。
以下示例显示了step
谁的tasklet
有一个commit-interval
值为 10,因为它将在 XML 中定义:
<job id="sampleJob">
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
</tasklet>
</step>
</job>
以下示例显示了step
谁的tasklet
有一个commit-interval
值为 10,因为它将在 Java 中定义:
@Bean
public Job sampleJob(JobRepository jobRepository) {
return new JobBuilder("sampleJob", jobRepository)
.start(step1())
.build();
}
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.build();
}
在前面的示例中,每个事务处理 10 个项目。在
开始处理时,事务开始。另外,每次read
在ItemReader
,则计数器递增。当它达到 10 时,聚合项的列表
传递给ItemWriter
,并且事务被提交。
配置Step
用于重新启动
在“配置和运行作业”部分中,重新启动Job
被讨论过。重启对步骤有很多影响,因此可能会
需要一些特定的配置。
设置起始限制
在许多情况下,您可能希望控制Step
能
开始。例如,您可能需要配置特定的Step
可能使它
仅运行一次,因为它会使某些资源失效,必须先手动修复这些资源才能修复
再次运行。这是在步骤级别上配置的,因为不同的步骤可能有
不同的要求。一个Step
只能执行一次的可以作为
相同Job
作为Step
可以无限运行。
以下代码片段显示了 XML 中的启动限制配置示例:
<step id="step1">
<tasklet start-limit="1">
<chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
</tasklet>
</step>
以下代码片段显示了 Java 中的启动限制配置示例:
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.startLimit(1)
.build();
}
前面示例中所示的步骤只能运行一次。尝试再次运行它
导致StartLimitExceededException
被扔掉。请注意,默认值
start-limit 是Integer.MAX_VALUE
.
重新启动已完成的Step
对于可重新启动的作业,可能有一个或多个步骤应始终
运行,无论他们第一次是否成功。一个例子可能
是验证步骤或Step
在处理之前清理资源。在
重新启动的作业的正常处理,状态为COMPLETED
(意思是
已成功完成),被跳过。设置allow-start-if-complete
自true
覆盖此步骤,以便该步骤始终运行。
以下代码片段演示了如何在 XML 中定义可重启的作业:
<step id="step1">
<tasklet allow-start-if-complete="true">
<chunk reader="itemReader" writer="itemWriter" commit-interval="10"/>
</tasklet>
</step>
以下代码片段显示了如何在 Java 中定义可重启的作业:
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.allowStartIfComplete(true)
.build();
}
Step
重启配置示例
以下 XML 示例演示如何将作业配置为具有可以 重新 启动:
<job id="footballJob" restartable="true">
<step id="playerload" next="gameLoad">
<tasklet>
<chunk reader="playerFileItemReader" writer="playerWriter"
commit-interval="10" />
</tasklet>
</step>
<step id="gameLoad" next="playerSummarization">
<tasklet allow-start-if-complete="true">
<chunk reader="gameFileItemReader" writer="gameWriter"
commit-interval="10"/>
</tasklet>
</step>
<step id="playerSummarization">
<tasklet start-limit="2">
<chunk reader="playerSummarizationSource" writer="summaryWriter"
commit-interval="10"/>
</tasklet>
</step>
</job>
以下 Java 示例演示如何配置作业以具有可以 重新 启动:
@Bean
public Job footballJob(JobRepository jobRepository) {
return new JobBuilder("footballJob", jobRepository)
.start(playerLoad())
.next(gameLoad())
.next(playerSummarization())
.build();
}
@Bean
public Step playerLoad(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("playerLoad", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(playerFileItemReader())
.writer(playerWriter())
.build();
}
@Bean
public Step gameLoad(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("gameLoad", jobRepository)
.allowStartIfComplete(true)
.<String, String>chunk(10, transactionManager)
.reader(gameFileItemReader())
.writer(gameWriter())
.build();
}
@Bean
public Step playerSummarization(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("playerSummarization", jobRepository)
.startLimit(2)
.<String, String>chunk(10, transactionManager)
.reader(playerSummarizationSource())
.writer(summaryWriter())
.build();
}
前面的示例配置适用于加载有关足球的信息的作业
游戏并总结它们。它包含三个步骤:playerLoad
,gameLoad
和playerSummarization
.这playerLoad
step 从平面文件加载播放器信息,
而gameLoad
step 对游戏也做同样的事情。最后一步,playerSummarization
,然后根据
提供游戏。假设由playerLoad
必须仅加载
一次,但那个gameLoad
可以加载在特定目录中找到的任何游戏,
在成功加载到数据库中后删除它们。结果,
这playerLoad
步骤不包含其他配置。它可以启动任何数字
如果完成,则跳过次数。这gameLoad
但是,需要运行步骤
每次,以防自上次运行以来添加了额外的文件。它有allow-start-if-complete
设置为true
始终启动。(假设
游戏加载到的数据库表上有一个进程指示器,以确保
新游戏可以通过摘要步骤正确找到)。汇总步骤
这是作业中最重要的,配置为起始限制为 2。这
很有用,因为如果该步骤持续失败,则新的退出代码将返回给
控制作业执行的运算符,并且在手动之前无法重新启动
干预已经发生。
此作业提供了本文档的示例,与footballJob 在示例项目中找到。 |
本节的其余部分描述了footballJob
例。
运行 1:
-
playerLoad
运行并成功完成,将 400 名玩家添加到PLAYERS
桌子。 -
gameLoad
运行并处理相当于 11 个文件的游戏数据,加载其内容 进入GAMES
桌子。 -
playerSummarization
开始处理,5 分钟后失败。
运行 2:
-
playerLoad
不运行,因为它已经成功完成,并且allow-start-if-complete
是false
(默认值)。 -
gameLoad
再次运行并处理另外 2 个文件,将其内容加载到GAMES
表(带有一个过程指示器,表明它们尚未 处理)。 -
playerSummarization
开始处理所有剩余的游戏数据(使用 进程指示器),30 分钟后再次失败。
运行 3:
-
playerLoad
不运行,因为它已经成功完成,并且allow-start-if-complete
是false
(默认值)。 -
gameLoad
再次运行并处理另外 2 个文件,将其内容加载到GAMES
表(带有一个过程指示器,表明它们尚未 处理)。 -
playerSummarization
未启动,作业会立即终止,因为这是 第三次执行playerSummarization
,其限制仅为 2。要么是限制 必须提高或Job
必须作为新的JobInstance
.
配置跳过逻辑
在许多情况下,处理过程中遇到的错误不应导致Step
失败,但应该跳过。这通常是一个必须做出的决定
由了解数据本身及其含义的人制作。财务数据,
例如,可能无法跳过,因为它会导致资金被转账,这
需要完全准确。另一方面,加载提供商列表可能会
允许跳过。如果提供商因格式不正确或
缺少必要的信息,可能没有问题。通常,这些不好的
记录也会被记录下来,稍后在讨论听众时会介绍这一点。
以下 XML 示例显示了使用跳过限制的示例:
<step id="step1">
<tasklet>
<chunk reader="flatFileItemReader" writer="itemWriter"
commit-interval="10" skip-limit="10">
<skippable-exception-classes>
<include class="org.springframework.batch.item.file.FlatFileParseException"/>
</skippable-exception-classes>
</chunk>
</tasklet>
</step>
以下 Java 示例显示了使用跳过限制的示例:
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(flatFileItemReader())
.writer(itemWriter())
.faultTolerant()
.skipLimit(10)
.skip(FlatFileParseException.class)
.build();
}
在前面的示例中,FlatFileItemReader
被使用。如果在任何时候,一个FlatFileParseException
,则跳过该项目并计入总数
跳过限制为 10。声明的异常(及其子类)可能会被抛出
在块处理的任何阶段(读取、处理或写入)。单独计数
由内部读取、处理和写入的跳过组成
步骤执行,但限制适用于所有跳过。一旦跳过限制
达到,则找到的下一个异常会导致该步骤失败。换句话说,第十一个
skip 触发异常,而不是第十个异常。
前面示例的一个问题是,除了FlatFileParseException
导致Job
失败。在某些情况下,这可能是
正确的行为。但是,在其他情况下,可能更容易识别哪些
异常应该会导致失败并跳过其他所有内容。
以下 XML 示例显示了排除特定异常的示例:
<step id="step1">
<tasklet>
<chunk reader="flatFileItemReader" writer="itemWriter"
commit-interval="10" skip-limit="10">
<skippable-exception-classes>
<include class="java.lang.Exception"/>
<exclude class="java.io.FileNotFoundException"/>
</skippable-exception-classes>
</chunk>
</tasklet>
</step>
以下 Java 示例显示了排除特定异常的示例:
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(flatFileItemReader())
.writer(itemWriter())
.faultTolerant()
.skipLimit(10)
.skip(Exception.class)
.noSkip(FileNotFoundException.class)
.build();
}
通过识别java.lang.Exception
作为可跳过的异常类,配置
表示所有Exceptions
是可跳过的。但是,通过“排除”java.io.FileNotFoundException
,则配置细化了可跳过的列表
异常类为 allExceptions
除了 FileNotFoundException
.任何被排除的
如果遇到异常类,则是致命的(即,它们不会被跳过)。
对于遇到的任何异常,可跳过性由最接近的超类确定 在类层次结构中。任何未分类的异常都被视为“致命”。
的顺序<include/>
和<exclude/>
元素无关紧要。
的顺序skip
和noSkip
方法调用无关紧要。
配置重试逻辑
在大多数情况下,您希望异常导致跳过或Step
失败。然而
并非所有例外都是确定性的。如果FlatFileParseException
遇到时
阅读时,它总是为该记录而投掷。重置ItemReader
无济于事。
但是,对于其他例外情况(例如DeadlockLoserDataAccessException
哪
表示当前进程已尝试更新另一个进程的记录
保持锁定),等待并重试可能会导致成功。
在 XML 中,重试应配置如下:
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="itemWriter"
commit-interval="2" retry-limit="3">
<retryable-exception-classes>
<include class="org.springframework.dao.DeadlockLoserDataAccessException"/>
</retryable-exception-classes>
</chunk>
</tasklet>
</step>
在 Java 中,重试应配置如下:
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(2, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.faultTolerant()
.retryLimit(3)
.retry(DeadlockLoserDataAccessException.class)
.build();
}
这Step
允许限制单个项目可以重试的次数,并且
“可重试”的异常列表。可以在重试中找到有关重试工作原理的更多详细信息。
控制回滚
默认情况下,无论重试还是跳过,从ItemWriter
导致由Step
回滚。如果 skip 配置为
前面所述,从ItemReader
不要导致回滚。
但是,在许多情况下,从ItemWriter
应该
不会导致回滚,因为没有执行任何作使事务无效。
因此,您可以配置Step
以及不应出现的例外情况列表
导致回滚。
在 XML 中,您可以按如下方式控制回滚:
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
<no-rollback-exception-classes>
<include class="org.springframework.batch.item.validator.ValidationException"/>
</no-rollback-exception-classes>
</tasklet>
</step>
在 Java 中,您可以按如下方式控制回滚:
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(2, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.faultTolerant()
.noRollback(ValidationException.class)
.build();
}
事务性读者
基本合同ItemReader
是它仅向前。该步骤缓冲reader 输入,以便在回滚时,不需要重新读取项目来自阅读器。但是,在某些情况下,读取器是构建在事务资源之上,例如 JMS 队列。在这种情况下,由于队列是绑定到回滚的事务,因此已从队列中提取的消息将放回。因此,您可以将步骤配置为不缓冲 项目。
以下示例演示如何创建不缓冲 XML 中项的读取器:
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="itemWriter" commit-interval="2"
is-reader-transactional-queue="true"/>
</tasklet>
</step>
以下示例显示了如何在 Java 中创建不缓冲项的读取器:
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(2, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.readerIsTransactionalQueue()
.build();
}
事务属性
您可以使用事务属性来控制isolation
,propagation
和timeout
设置。 您可以在以下位置找到有关设置事务属性的更多信息Spring核心文档。
以下示例将isolation
,propagation
和timeout
交易 XML 中的属性:
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
<transaction-attributes isolation="DEFAULT"
propagation="REQUIRED"
timeout="30"/>
</tasklet>
</step>
以下示例将isolation
,propagation
和timeout
交易 Java 中的属性:
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
attribute.setPropagationBehavior(Propagation.REQUIRED.value());
attribute.setIsolationLevel(Isolation.DEFAULT.value());
attribute.setTimeout(30);
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(2, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.transactionAttribute(attribute)
.build();
}
注册ItemStream
使用Step
步骤必须注意ItemStream
回调在其
生命周期。(有关ItemStream
接口,请参阅 ItemStream)。如果一个步骤失败,这一点至关重要,并且可能会
需要重新启动,因为ItemStream
接口是步骤获取
它需要的有关执行之间持久状态的信息。
如果ItemReader
,ItemProcessor
或ItemWriter
本身实现了ItemStream
接口,这些是自动注册的。任何其他流都需要
单独注册。这通常是间接依赖关系(例如
委托,被注入到读取器和写入器中。您可以在step
通过stream
元素。
以下示例演示如何注册stream
在step
在 XML 中:
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="compositeWriter" commit-interval="2">
<streams>
<stream ref="fileItemWriter1"/>
<stream ref="fileItemWriter2"/>
</streams>
</chunk>
</tasklet>
</step>
<beans:bean id="compositeWriter"
class="org.springframework.batch.item.support.CompositeItemWriter">
<beans:property name="delegates">
<beans:list>
<beans:ref bean="fileItemWriter1" />
<beans:ref bean="fileItemWriter2" />
</beans:list>
</beans:property>
</beans:bean>
以下示例演示如何注册stream
在step
在 Java 中:
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(2, transactionManager)
.reader(itemReader())
.writer(compositeItemWriter())
.stream(fileItemWriter1())
.stream(fileItemWriter2())
.build();
}
/**
* In Spring Batch 4, the CompositeItemWriter implements ItemStream so this isn't
* necessary, but used for an example.
*/
@Bean
public CompositeItemWriter compositeItemWriter() {
List<ItemWriter> writers = new ArrayList<>(2);
writers.add(fileItemWriter1());
writers.add(fileItemWriter2());
CompositeItemWriter itemWriter = new CompositeItemWriter();
itemWriter.setDelegates(writers);
return itemWriter;
}
在前面的示例中,CompositeItemWriter
不是ItemStream
,但它的两个
代表是。因此,必须将两个委托编写器显式注册为流
以便框架正确处理它们。这ItemReader
不需要
显式注册为流,因为它是Step
.步骤
现在可以重新启动,并且读取器和写入器的状态正确地保存在
故障事件。
拦截Step
执行
就像Job
,在执行Step
其中
用户可能需要执行某些功能。例如,写出到一个公寓
需要页脚的文件,则ItemWriter
当Step
有
已完成,以便可以写入页脚。这可以通过众多Step
作用域侦听器。
您可以应用任何实现StepListener
(但不是那个界面
本身,因为它是空的)到通过listeners
元素。
这listeners
元素在步骤、任务或块声明中有效。我们
建议您在应用其函数的级别声明侦听器
或者,如果它是多功能(例如StepExecutionListener
和ItemReadListener
),
在应用的最精细级别声明它。
以下示例显示了在 XML 中块级别应用的侦听器:
<step id="step1">
<tasklet>
<chunk reader="reader" writer="writer" commit-interval="10"/>
<listeners>
<listener ref="chunkListener"/>
</listeners>
</tasklet>
</step>
以下示例显示了在 Java 中应用在块级别的侦听器:
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(reader())
.writer(writer())
.listener(chunkListener())
.build();
}
一ItemReader
,ItemWriter
或ItemProcessor
它本身实现了StepListener
接口会自动注册到Step
如果使用
Namespace<step>
元素或*StepFactoryBean
工厂。这只是
适用于直接注入Step
.如果监听器嵌套在
另一个组件,您需要显式注册它(如前面在注册ItemStream
使用Step
).
除了StepListener
接口,提供注释来解决
同样的担忧。普通的旧 Java 对象可以具有具有这些注释的方法,这些注释是
然后转换为相应的StepListener
类型。注释也很常见
块组件的自定义实现,例如ItemReader
或ItemWriter
或Tasklet
.XML 解析器分析注释<listener/>
元素
以及在listener
方法,所以你需要做的就是
是使用 XML 命名空间或构建器向步骤注册侦听器。
StepExecutionListener
StepExecutionListener
表示最通用的监听器Step
执行。它
允许在Step
开始了,结束后,是否结束了
normal或failed,如以下示例所示:
public interface StepExecutionListener extends StepListener {
void beforeStep(StepExecution stepExecution);
ExitStatus afterStep(StepExecution stepExecution);
}
ExitStatus
返回类型为afterStep
,让听众有机会
修改完成Step
.
与此接口对应的注释是:
-
@BeforeStep
-
@AfterStep
ChunkListener
“块”定义为在事务范围内处理的项目。提交一个
transaction,在每个提交间隔,提交一个块。您可以使用ChunkListener
自
在块开始处理之前或块完成后执行逻辑
成功,如以下接口定义所示:
public interface ChunkListener extends StepListener {
void beforeChunk(ChunkContext context);
void afterChunk(ChunkContext context);
void afterChunkError(ChunkContext context);
}
beforeChunk 方法在事务启动后但在读取开始之前调用
在ItemReader
.相反afterChunk
在块被调用后
已提交(如果有回滚,则根本不提交)。
与此接口对应的注释是:
-
@BeforeChunk
-
@AfterChunk
-
@AfterChunkError
您可以应用ChunkListener
当没有块声明时。这TaskletStep
是
负责调用ChunkListener
,因此它适用于非面向项的任务
(它在 tasklet 之前和之后调用)。
ItemReadListener
之前在讨论跳过逻辑时,提到记录可能会有所帮助
跳过的记录,以便以后处理。在读取错误的情况下,
这可以通过ItemReaderListener
,作为以下接口
定义显示:
public interface ItemReadListener<T> extends StepListener {
void beforeRead();
void afterRead(T item);
void onReadError(Exception ex);
}
这beforeRead
在每次调用之前调用方法以读取ItemReader
.这afterRead
每次成功调用 read 后调用方法,并传递项目
那是读出来的。如果读取时出现错误,则onReadError
方法被调用。
提供遇到的异常,以便可以记录它。
与此接口对应的注释是:
-
@BeforeRead
-
@AfterRead
-
@OnReadError
ItemProcessListener
与ItemReadListener
,可以“监听”项目的处理,如
以下接口定义显示:
public interface ItemProcessListener<T, S> extends StepListener {
void beforeProcess(T item);
void afterProcess(T item, S result);
void onProcessError(T item, Exception e);
}
这beforeProcess
method 在process
在ItemProcessor
并且是
递上要处理的项目。这afterProcess
方法在
项目已成功处理。如果在处理过程中出现错误,则onProcessError
方法被调用。遇到的异常和
提供了尝试处理的,以便可以记录它们。
与此接口对应的注释是:
-
@BeforeProcess
-
@AfterProcess
-
@OnProcessError
ItemWriteListener
您可以使用ItemWriteListener
,作为
以下接口定义显示:
public interface ItemWriteListener<S> extends StepListener {
void beforeWrite(List<? extends S> items);
void afterWrite(List<? extends S> items);
void onWriteError(Exception exception, List<? extends S> items);
}
这beforeWrite
method 在write
在ItemWriter
并收到所写项目列表。 这afterWrite
方法在项目被成功写入后调用,但在提交与块处理相关的事务之前。如果在写入时出现错误,则onWriteError
方法被调用。遇到的异常和尝试写入的项目是提供的,以便可以记录它们。
与此接口对应的注释是:
-
@BeforeWrite
-
@AfterWrite
-
@OnWriteError
SkipListener
ItemReadListener
,ItemProcessListener
和ItemWriteListener
都提供了机制
收到错误通知,但没有记录实际上已通知您
跳。onWriteError
例如,即使重试了项目,也会调用,并且
成功的。因此,有一个单独的接口用于跟踪跳过的项目,如
以下接口定义显示:
public interface SkipListener<T,S> extends StepListener {
void onSkipInRead(Throwable t);
void onSkipInProcess(T item, Throwable t);
void onSkipInWrite(S item, Throwable t);
}
onSkipInRead
每当读取时跳过项目时都会调用。应该注意的是
回滚可能会导致同一项目多次注册为跳过。onSkipInWrite
在写入时跳过项目时调用。因为该项目具有
被成功读取(并且没有跳过),它也会作为
论点。
与此接口对应的注释是:
-
@OnSkipInRead
-
@OnSkipInWrite
-
@OnSkipInProcess
TaskletStep
面向块的处理并不是在Step
.如果一个Step
必须包含存储过程调用?你可以
将调用实现为ItemReader
并在过程完成后返回 null。
然而,这样做有点不自然,因为需要有一个无作ItemWriter
.
Spring Batch 提供了TaskletStep
对于这种情况。
这Tasklet
interface 有一个方法,execute
,称为
重复由TaskletStep
直到它返回RepeatStatus.FINISHED
或投掷
异常以表示失败。每次调用Tasklet
被包装在事务中。Tasklet
实现者可能会调用存储过程、脚本或 SQL 更新
陈述。
要创建TaskletStep
在 XML 中,ref
属性的<tasklet/>
元素应该
引用定义Tasklet
对象。不<chunk/>
元素应该使用
在<tasklet/>
.以下示例显示了一个简单的任务:
<step id="step1">
<tasklet ref="myTasklet"/>
</step>
要创建TaskletStep
在 Java 中,bean 传递给tasklet
建造者的方法
应实现Tasklet
接口。没有电话chunk
当
构建一个TaskletStep
.以下示例显示了一个简单的任务:
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.tasklet(myTasklet(), transactionManager)
.build();
}
如果它实现了StepListener 接口TaskletStep 自动将 tasklet 注册为StepListener . |
TaskletAdapter
与其他适配器一样,用于ItemReader
和ItemWriter
接口,则Tasklet
接口包含一个允许自行适应任何预先存在的实现
类:TaskletAdapter
.这可能有用的一个例子是现有的 DAO,它是
用于更新一组记录上的标志。您可以使用TaskletAdapter
称呼此
类,而无需为Tasklet
接口。
以下示例显示如何定义TaskletAdapter
在 XML 中:
<bean id="myTasklet" class="o.s.b.core.step.tasklet.MethodInvokingTaskletAdapter">
<property name="targetObject">
<bean class="org.mycompany.FooDao"/>
</property>
<property name="targetMethod" value="updateFoo" />
</bean>
以下示例显示如何定义TaskletAdapter
在 Java 中:
@Bean
public MethodInvokingTaskletAdapter myTasklet() {
MethodInvokingTaskletAdapter adapter = new MethodInvokingTaskletAdapter();
adapter.setTargetObject(fooDao());
adapter.setTargetMethod("updateFoo");
return adapter;
}
示例Tasklet
实现
许多批处理作业包含必须在主处理开始之前完成的步骤,设置各种资源或在处理完成后清理这些资源 资源。 对于大量处理文件的作业,通常需要在成功上传到另一个文件后,在本地删除某些文件 位置。 以下示例(取自 SpringBatch samples 项目)是一个Tasklet
实施时承担这样的责任:
public class FileDeletingTasklet implements Tasklet, InitializingBean {
private Resource directory;
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
File dir = directory.getFile();
Assert.state(dir.isDirectory(), "The resource must be a directory");
File[] files = dir.listFiles();
for (int i = 0; i < files.length; i++) {
boolean deleted = files[i].delete();
if (!deleted) {
throw new UnexpectedJobExecutionException("Could not delete file " +
files[i].getPath());
}
}
return RepeatStatus.FINISHED;
}
public void setDirectoryResource(Resource directory) {
this.directory = directory;
}
public void afterPropertiesSet() throws Exception {
Assert.state(directory != null, "Directory must be set");
}
}
前面的tasklet
实现删除给定目录中的所有文件。 它 应该注意的是,execute
方法只调用一次。剩下的就是引用tasklet
从step
.
以下示例演示如何引用tasklet
从step
在 XML 中:
<job id="taskletJob">
<step id="deleteFilesInDir">
<tasklet ref="fileDeletingTasklet"/>
</step>
</job>
<beans:bean id="fileDeletingTasklet"
class="org.springframework.batch.sample.tasklet.FileDeletingTasklet">
<beans:property name="directoryResource">
<beans:bean id="directory"
class="org.springframework.core.io.FileSystemResource">
<beans:constructor-arg value="target/test-outputs/test-dir" />
</beans:bean>
</beans:property>
</beans:bean>
以下示例演示如何引用tasklet
从step
在 Java 中:
@Bean
public Job taskletJob(JobRepository jobRepository) {
return new JobBuilder("taskletJob", jobRepository)
.start(deleteFilesInDir())
.build();
}
@Bean
public Step deleteFilesInDir(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("deleteFilesInDir", jobRepository)
.tasklet(fileDeletingTasklet(), transactionManager)
.build();
}
@Bean
public FileDeletingTasklet fileDeletingTasklet() {
FileDeletingTasklet tasklet = new FileDeletingTasklet();
tasklet.setDirectoryResource(new FileSystemResource("target/test-outputs/test-dir"));
return tasklet;
}
控制步骤流
由于能够在拥有的作业中将步骤组合在一起,因此需要能够控制作业如何从一个步骤“流向”另一个步骤。一个失败Step
不 必然意味着Job
应该失败。此外,可能有不止一种类型的“成功”决定了哪个Step
接下来应该执行。取决于组Steps
配置后,某些步骤甚至可能根本无法处理。
顺序流
最简单的流方案是所有步骤按顺序执行的作业,如下图显示:

这可以通过使用next
在step
.
以下示例演示如何使用next
XML 中的属性:
<job id="job">
<step id="stepA" parent="s1" next="stepB" />
<step id="stepB" parent="s2" next="stepC"/>
<step id="stepC" parent="s3" />
</job>
以下示例演示如何使用next()
Java 中的方法:
@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(stepA())
.next(stepB())
.next(stepC())
.build();
}
在上述场景中,stepA
首先运行,因为它是第一个Step
上市。如果stepA
正常完成,stepB
运行,依此类推。但是,如果step A
失败
整个Job
fails 和stepB
不执行。
使用 Spring Batch XML 命名空间时,配置中列出的第一步始终是由Job .其他步骤元素的顺序不会
重要,但第一步必须始终出现在 XML 中的第一位。 |
条件流
在前面的示例中,只有两种可能性:
-
这
step
成功了,接下来step
应该执行。 -
这
step
失败,因此,job
应该失败。
在许多情况下,这可能就足够了。但是,如果失败step
应该触发不同的step
,而不是造成失败? 这 下图显示了这样的流:

为了处理更复杂的场景,Spring Batch XML 命名空间允许您定义过渡step 元素中的元素。其中一种过渡是next
元素。 像next
属性,则next
元素告诉Job
哪Step
自 execute next。但是,与属性不同的是,任意数量的next
元素允许在给定的Step
,并且在失败的情况下没有默认行为。这意味着,如果使用transition 元素,则Step
过渡必须是显式定义。另请注意,单个步骤不能同时具有next
属性和 一个transition
元素。
这next
元素指定要匹配的模式和接下来要执行的步骤,如以下示例显示:
<job id="job">
<step id="stepA" parent="s1">
<next on="*" to="stepB" />
<next on="FAILED" to="stepC" />
</step>
<step id="stepB" parent="s2" next="stepC" />
<step id="stepC" parent="s3" />
</job>
Java API 提供了一组流畅的方法,可让您指定流和要执行的作当步骤失败时。以下示例显示了如何指定一个步骤 (stepA
),然后继续执行两个不同步骤(stepB
或stepC
),取决于是否stepA
成功:
@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(stepA())
.on("*").to(stepB())
.from(stepA()).on("FAILED").to(stepC())
.end()
.build();
}
使用 XML 配置时,on
属性使用简单的
模式匹配方案以匹配ExitStatus
执行Step
.
使用 java 配置时,on()
方法使用简单的模式匹配方案来
匹配ExitStatus
执行Step
.
模式中只允许两个特殊字符:
-
*
匹配零个或多个字符 -
?
恰好匹配一个字符
例如c*t
比赛cat
和count
而c?t
比赛cat
但不是count
.
虽然Step
,如果Step
执行结果为ExitStatus
未被元素覆盖的,则
framework 抛出异常,并且Job
失败。框架自动排序
从最具体到最不具体的过渡。这意味着,即使订购
被换成stepA
在前面的示例中,一个ExitStatus
之FAILED
还是会去
自stepC
.
批处理状态与退出状态
配置Job
对于条件流,了解
区别BatchStatus
和ExitStatus
.BatchStatus
是一个枚举,其中
是两者的属性JobExecution
和StepExecution
并被框架用于
记录Job
或Step
.它可以是以下值之一:COMPLETED
,STARTING
,STARTED
,STOPPING
,STOPPED
,FAILED
,ABANDONED
或UNKNOWN
.其中大多数是不言自明的:COMPLETED
是步骤时设置的状态
或作业已成功完成,FAILED
失败时设置,依此类推。
以下示例包含next
元素:
<next on="FAILED" to="stepB" />
以下示例包含on
元素:
...
.from(stepA()).on("FAILED").to(stepB())
...
乍一看,似乎on
引用BatchStatus
的Step
自
它所属的。但是,它实际上引用了ExitStatus
的Step
.作为
顾名思义ExitStatus
表示Step
在它完成执行后。
更具体地说,当使用 XML 配置时,next
元素显示在
前面的 XML 配置示例引用了ExitStatus
.
使用 Java 配置时,on()
如前述所示的方法
Java 配置示例引用了ExitStatus
.
在英语中,它说:“如果退出代码为 FAILED,则转到步骤 B”。默认情况下,出口
code 始终与BatchStatus
对于Step
,这就是为什么前面的条目
工程。但是,如果退出代码需要不同怎么办?一个很好的例子来自
示例项目中的跳过示例作业:
以下示例演示如何使用 XML 中的不同退出代码:
<step id="step1" parent="s1">
<end on="FAILED" />
<next on="COMPLETED WITH SKIPS" to="errorPrint1" />
<next on="*" to="step2" />
</step>
以下示例显示了如何在 Java 中使用不同的退出代码:
@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(step1()).on("FAILED").end()
.from(step1()).on("COMPLETED WITH SKIPS").to(errorPrint1())
.from(step1()).on("*").to(step2())
.end()
.build();
}
step1
有三种可能:
-
这
Step
failed,在这种情况下,作业应该失败。 -
这
Step
成功完成。 -
这
Step
成功完成,但退出代码为COMPLETED WITH SKIPS
.在 在这种情况下,应运行不同的步骤来处理错误。
上述配置有效。但是,需要根据以下内容更改退出代码 跳过记录的执行条件,如以下示例所示:
public class SkipCheckingListener implements StepExecutionListener {
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
String exitCode = stepExecution.getExitStatus().getExitCode();
if (!exitCode.equals(ExitStatus.FAILED.getExitCode()) &&
stepExecution.getSkipCount() > 0) {
return new ExitStatus("COMPLETED WITH SKIPS");
} else {
return null;
}
}
}
前面的代码是StepExecutionListener
首先检查以确保Step
是
成功,然后检查StepExecution
高于
0. 如果同时满足这两个条件,则新的ExitStatus
退出代码为COMPLETED WITH SKIPS
被返回。
配置停止
经过讨论BatchStatus
和ExitStatus
,
人们可能想知道BatchStatus
和ExitStatus
确定为Job
.
虽然这些状态是为Step
通过执行的代码,
状态Job
根据配置确定。
到目前为止,所有讨论的作业配置都至少有一个最终结果Step
跟
没有过渡。
在以下 XML 示例中,在step
执行,则Job
结束:
<step id="stepC" parent="s3"/>
在下面的 Java 示例中,在step
执行,则Job
结束:
@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(step1())
.build();
}
如果未为Step
,则Job
定义为
遵循:
-
如果
Step
结尾为ExitStatus
之FAILED
这BatchStatus
和ExitStatus
之 这Job
都是FAILED
. -
否则,
BatchStatus
和ExitStatus
的Job
都是COMPLETED
.
虽然这种终止批处理作业的方法对于某些批处理作业(例如
简单的顺序步骤作业,可能需要自定义定义的作业停止场景。为
为此,Spring Batch 提供了三个过渡元素来阻止Job
(在
除了next
元素我们之前讨论过的)。
这些停止元素中的每一个都会停止一个Job
使用特定的BatchStatus
.是的
重要的是要注意,停止过渡元素对BatchStatus
或ExitStatus
任何Steps
在Job
.这些元素仅影响
的最终状态Job
.例如,作业中的每个步骤都可能具有
状态为FAILED
但要使作业的状态为COMPLETED
.
在步骤结束
配置步骤结束指示Job
以BatchStatus
之COMPLETED
.一个Job
已结束状态为COMPLETED
无法重新启动(框架抛出
一个JobInstanceAlreadyCompleteException
).
使用 XML 配置时,您可以使用end
元素。这end
元素
还允许可选的exit-code
属性,您可以使用该属性来自定义ExitStatus
的Job
.如果没有exit-code
属性,则ExitStatus
是COMPLETED
默认情况下,要匹配BatchStatus
.
使用 Java 配置时,end
方法用于此任务。这end
方法
还允许可选的exitStatus
参数,可用于自定义ExitStatus
的Job
.如果没有exitStatus
value 时,则ExitStatus
是COMPLETED
默认情况下,要匹配BatchStatus
.
考虑以下场景:如果step2
fails,则Job
以BatchStatus
之COMPLETED
和ExitStatus
之COMPLETED
和step3
不运行。
否则,执行将移至step3
.请注意,如果step2
fails,则Job
莫
restartable(因为状态为COMPLETED
).
以下示例显示了 XML 中的方案:
<step id="step1" parent="s1" next="step2">
<step id="step2" parent="s2">
<end on="FAILED"/>
<next on="*" to="step3"/>
</step>
<step id="step3" parent="s3">
以下示例显示了 Java 中的场景:
@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(step1())
.next(step2())
.on("FAILED").end()
.from(step2()).on("*").to(step3())
.end()
.build();
}
步骤失败
将步骤配置为在给定点失败会指示Job
以BatchStatus
之FAILED
.与 end 不同,失败Job
不会阻止Job
从重新启动。
使用 XML 配置时,fail
元素还允许可选的exit-code
属性,可用于自定义ExitStatus
的Job
.如果没有exit-code
属性,则ExitStatus
是FAILED
默认情况下,要匹配BatchStatus
.
考虑以下场景:如果step2
fails,则Job
以BatchStatus
之FAILED
和ExitStatus
之EARLY TERMINATION
和step3
不
执行。否则,执行将移至step3
.此外,如果step2
fails 和Job
重新启动时,执行将重新开始step2
.
以下示例显示了 XML 中的方案:
<step id="step1" parent="s1" next="step2">
<step id="step2" parent="s2">
<fail on="FAILED" exit-code="EARLY TERMINATION"/>
<next on="*" to="step3"/>
</step>
<step id="step3" parent="s3">
以下示例显示了 Java 中的场景:
@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(step1())
.next(step2()).on("FAILED").fail()
.from(step2()).on("*").to(step3())
.end()
.build();
}
在给定步骤停止作业
将作业配置为在特定步骤停止会指示Job
以BatchStatus
之STOPPED
.停止Job
可以提供加工的暂时中断,
以便操作员可以在重新启动Job
.
使用 XML 配置时,stop
元素需要一个restart
指定
当Job
将重新启动。
使用 Java 配置时,stopAndRestart
方法需要一个restart
属性
指定重新启动作业时应开始执行的步骤。
考虑以下场景:如果step1
饰面COMPLETE
,则作业
停止。重新启动后,执行开始step2
.
以下列表显示了 XML 中的方案:
<step id="step1" parent="s1">
<stop on="COMPLETED" restart="step2"/>
</step>
<step id="step2" parent="s2"/>
以下示例显示了 Java 中的场景:
@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(step1()).on("COMPLETED").stopAndRestart(step2())
.end()
.build();
}
程序化流程决策
在某些情况下,信息比ExitStatus
可能需要决定
接下来要执行的步骤。在这种情况下,一个JobExecutionDecider
可以用来协助
在决策中,如以下示例所示:
public class MyDecider implements JobExecutionDecider {
public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
String status;
if (someCondition()) {
status = "FAILED";
}
else {
status = "COMPLETED";
}
return new FlowExecutionStatus(status);
}
}
在以下示例作业配置中,decision
指定要用作的决策器
以及所有的过渡:
<job id="job">
<step id="step1" parent="s1" next="decision" />
<decision id="decision" decider="decider">
<next on="FAILED" to="step2" />
<next on="COMPLETED" to="step3" />
</decision>
<step id="step2" parent="s2" next="step3"/>
<step id="step3" parent="s3" />
</job>
<beans:bean id="decider" class="com.MyDecider"/>
在以下示例中,实现JobExecutionDecider
通过
直接到next
使用 Java 配置时调用:
@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(step1())
.next(decider()).on("FAILED").to(step2())
.from(decider()).on("COMPLETED").to(step3())
.end()
.build();
}
拆分流
到目前为止,描述的每个场景都涉及Job
以
线性方式的时间。除了这种典型的样式外,Spring Batch 还允许
用于配置具有并行流的作业。
XML 命名空间允许您使用split
元素。如以下示例所示,
这split
元素包含一个或多个flow
元素,其中整个单独的流可以
被定义。一个split
元素还可以包含前面讨论的任何过渡
元素,例如next
属性或next
,end
或fail
元素。
<split id="split1" next="step4">
<flow>
<step id="step1" parent="s1" next="step2"/>
<step id="step2" parent="s2"/>
</flow>
<flow>
<step id="step3" parent="s3"/>
</flow>
</split>
<step id="step4" parent="s4"/>
基于 Java 的配置允许您通过提供的构建器配置拆分。作为
以下示例显示,split
元素包含一个或多个flow
元素,其中
可以定义整个单独的流。一个split
元素还可以包含任何
前面讨论过的过渡元素,例如next
属性或next
,end
或fail
元素。
@Bean
public Flow flow1() {
return new FlowBuilder<SimpleFlow>("flow1")
.start(step1())
.next(step2())
.build();
}
@Bean
public Flow flow2() {
return new FlowBuilder<SimpleFlow>("flow2")
.start(step3())
.build();
}
@Bean
public Job job(Flow flow1, Flow flow2) {
return this.jobBuilderFactory.get("job")
.start(flow1)
.split(new SimpleAsyncTaskExecutor())
.add(flow2)
.next(step4())
.end()
.build();
}
外部化流定义和作业之间的依赖关系
作业中部分流可以作为单独的 bean 定义进行外部化,然后 重复使用。有两种方法可以做到这一点。第一种是将流声明为 引用其他地方定义的。
以下 XML 示例演示如何将流声明为对已定义流的引用 别处:
<job id="job">
<flow id="job1.flow1" parent="flow1" next="step3"/>
<step id="step3" parent="s3"/>
</job>
<flow id="flow1">
<step id="step1" parent="s1" next="step2"/>
<step id="step2" parent="s2"/>
</flow>
以下 Java 示例演示如何将流声明为对已定义流的引用 别处:
@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(flow1())
.next(step3())
.end()
.build();
}
@Bean
public Flow flow1() {
return new FlowBuilder<SimpleFlow>("flow1")
.start(step1())
.next(step2())
.build();
}
如前面的示例所示,定义外部流的效果是插入 从外部流入作业的步骤,就好像它们已内联声明一样。在 这样,许多作业可以引用相同的模板流,并将此类模板组合成 不同的逻辑流。这也是分离集成测试的好方法 个人流动。
外部化流的另一种形式是使用JobStep
.一个JobStep
类似于FlowStep
但实际上为
指定的流量。
以下示例如何以JobStep
在 XML 中:
<job id="jobStepJob" restartable="true">
<step id="jobStepJob.step1">
<job ref="job" job-launcher="jobLauncher"
job-parameters-extractor="jobParametersExtractor"/>
</step>
</job>
<job id="job" restartable="true">...</job>
<bean id="jobParametersExtractor" class="org.spr...DefaultJobParametersExtractor">
<property name="keys" value="input.file"/>
</bean>
以下示例显示了JobStep
在 Java 中:
@Bean
public Job jobStepJob(JobRepository jobRepository) {
return new JobBuilder("jobStepJob", jobRepository)
.start(jobStepJobStep1(null))
.build();
}
@Bean
public Step jobStepJobStep1(JobLauncher jobLauncher, JobRepository jobRepository) {
return new StepBuilder("jobStepJobStep1", jobRepository)
.job(job())
.launcher(jobLauncher)
.parametersExtractor(jobParametersExtractor())
.build();
}
@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(step1())
.build();
}
@Bean
public DefaultJobParametersExtractor jobParametersExtractor() {
DefaultJobParametersExtractor extractor = new DefaultJobParametersExtractor();
extractor.setKeys(new String[]{"input.file"});
return extractor;
}
作业参数提取器是一种策略,用于确定ExecutionContext
为
这Step
转换为JobParameters
对于Job
那就是运行。这JobStep
是
当您想要有一些更精细的选项来监控和报告时很有用
作业和步骤。用JobStep
也往往是对这个问题的很好的回答:“我该如何
在工作之间建立依赖关系?这是将大型系统分解为以下内容的好方法
更小的模块并控制作业流程。
后期绑定Job
和Step
属性
前面显示的 XML 和平面文件示例都使用 SpringResource
抽象化
以获取文件。这之所以有效,是因为Resource
有一个getFile
返回java.io.File
.您可以使用标准 Spring 配置 XML 和平面文件资源
构建:
以下示例显示了 XML 中的后期绑定:
<bean id="flatFileItemReader"
class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource"
value="file://outputs/file.txt" />
</bean>
以下示例显示了 Java 中的后期绑定:
@Bean
public FlatFileItemReader flatFileItemReader() {
FlatFileItemReader<Foo> reader = new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource("file://outputs/file.txt"))
...
}
前面的Resource
从指定的文件系统位置加载文件。注意
绝对位置必须以双斜杠 () 开头。在大多数Spring
应用程序,这个解决方案已经足够好了,因为这些资源的名称是
在编译时已知。但是,在批处理方案中,文件名可能需要
在运行时确定为作业的参数。这可以使用//
-D
参数
读取系统属性。
以下示例演示如何从 XML 中的属性中读取文件名:
<bean id="flatFileItemReader"
class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource" value="${input.file.name}" />
</bean>
下面显示了如何从 Java 中的属性中读取文件名:
@Bean
public FlatFileItemReader flatFileItemReader(@Value("${input.file.name}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
要使此解决方案正常工作,只需要一个系统参数(例如-Dinput.file.name="file://outputs/file.txt"
).
虽然您可以使用PropertyPlaceholderConfigurer 在这里,它不是
如果始终设置系统属性,则必要,因为ResourceEditor 在Spring
已经过滤并对系统属性进行占位符替换。 |
通常,在批处理设置中,最好在JobParameters
(而不是通过系统属性)并访问它们
道路。为了实现这一点,Spring Batch 允许将各种Job
和Step
属性。
以下示例演示如何在 XML 中参数化文件名:
<bean id="flatFileItemReader" scope="step"
class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource" value="#{jobParameters['input.file.name']}" />
</bean>
以下示例显示了如何在 Java 中参数化文件名:
@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters['input.file.name']}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
您可以访问JobExecution
和StepExecution
水平ExecutionContext
在
同样的方式。
以下示例演示如何访问ExecutionContext
在 XML 中:
<bean id="flatFileItemReader" scope="step"
class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource" value="#{jobExecutionContext['input.file.name']}" />
</bean>
<bean id="flatFileItemReader" scope="step"
class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource" value="#{stepExecutionContext['input.file.name']}" />
</bean>
以下示例演示如何访问ExecutionContext
在 Java 中:
@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobExecutionContext['input.file.name']}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{stepExecutionContext['input.file.name']}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
任何使用后期绑定的 bean 都必须使用scope="step" .有关详细信息,请参阅步骤范围。
一个Step bean 不应该是步进作用域。如果步骤中需要延迟绑定
定义,该步骤的组件(任务、项读取器或写入器等)
是应该限定范围的。 |
如果您使用 Spring 3.0(或更高版本),则步进作用域 bean 中的表达式位于 Spring 表达式语言,一种功能强大的通用语言,具有许多有趣的 特征。为了提供向后兼容性,如果 Spring Batch 检测到 旧版本的 Spring,它使用一种功能较弱的原生表达式语言,并且 其解析规则略有不同。主要区别在于地图键入 上面的例子不需要在 Spring 2.5 中引用,但引号是强制性的 在 Spring 3.0 中。 |
步骤范围
前面显示的所有后期绑定示例的范围为step
在
bean 定义。
以下示例显示了在 XML 中绑定到步骤作用域的示例:
<bean id="flatFileItemReader" scope="step"
class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource" value="#{jobParameters[input.file.name]}" />
</bean>
以下示例显示了在 Java 中绑定到步骤作用域的示例:
@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters[input.file.name]}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
使用Step
需要使用后期绑定,因为 bean 不能
实际上被实例化,直到Step
开始,以便找到属性。
因为默认情况下它不是 Spring 容器的一部分,所以必须添加作用域
显式地,通过使用batch
命名空间,通过显式包含 bean 定义
对于StepScope
,或使用@EnableBatchProcessing
注解。仅使用以下一种
那些方法。以下示例使用batch
Namespace:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="...">
<batch:job .../>
...
</beans>
以下示例显式包含 bean 定义:
<bean class="org.springframework.batch.core.scope.StepScope" />
工作范围
Job
scope 中引入的 Spring Batch 3.0 类似于Step
配置中的范围
但是一个作用域Job
上下文,因此只有一个这样的 bean 实例
每个正在运行的作业。此外,还支持引用的延迟绑定
可从JobContext
通过使用#{..}
占位符。使用此功能,您可以拉豆
作业或作业执行上下文和作业参数中的属性。
以下示例显示了在 XML 中绑定到作业范围的示例:
<bean id="..." class="..." scope="job">
<property name="name" value="#{jobParameters[input]}" />
</bean>
<bean id="..." class="..." scope="job">
<property name="name" value="#{jobExecutionContext['input.name']}.txt" />
</bean>
以下示例显示了在 Java 中绑定到作业范围的示例:
@JobScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters[input]}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
@JobScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobExecutionContext['input.name']}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.name("flatFileItemReader")
.resource(new FileSystemResource(name))
...
}
因为默认情况下它不是 Spring 容器的一部分,所以必须添加作用域
显式地,通过使用batch
命名空间,通过显式包含 bean 定义
JobScope,或使用@EnableBatchProcessing
注释(仅选择一种方法)。
以下示例使用batch
Namespace:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="...">
<batch:job .../>
...
</beans>
以下示例包括一个 bean,该 bean 显式定义了JobScope
:
<bean class="org.springframework.batch.core.scope.JobScope" />
在多线程中使用作业范围的 Bean 存在一些实际限制 或分区步骤。Spring Batch 不控制在这些 用例,因此无法正确设置它们以使用此类 bean。因此 我们不建议在多线程或分区步骤中使用作业范围的 Bean。 |
范围ItemStream
组件
使用 Java 配置样式定义作业或步骤范围时ItemStream
豆
bean 定义方法的返回类型应至少为ItemStream
.这是必需的
以便 Spring Batch 正确创建实现此接口的代理,因此
通过调用open
,update
和close
方法。
建议让这类 bean 的 bean 定义方法返回最具体的 已知实现,如以下示例所示:
@Bean
@StepScope
public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters['input.file.name']}") String name) {
return new FlatFileItemReaderBuilder<Foo>()
.resource(new FileSystemResource(name))
// set other properties of the item reader
.build();
}