常见模式
常见批处理模式
一些 Batch Job 可以完全从 Spring Batch 中的现成组件组装。
例如,可以将 and 实现配置为
涵盖广泛的场景。但是,在大多数情况下,自定义代码必须为
写。应用程序开发人员的主要 API 入口点是 、 、 、 和各种侦听器接口。最简单的批处理
jobs 可以使用来自 Spring Batch 的现成 input ,但它通常是
在处理和写入中存在需要开发人员的自定义关注点
实现 OR .ItemReader
ItemWriter
Tasklet
ItemReader
ItemWriter
ItemReader
ItemWriter
ItemProcessor
在本章中,我们提供了自定义业务逻辑中常见模式的一些示例。
这些示例主要包含侦听器接口。应该注意的是,如果合适,or 也可以实现侦听器接口。ItemReader
ItemWriter
记录项处理和失败
一个常见的用例是需要对步骤中的错误进行特殊处理,逐项,
也许是登录到特殊通道或将记录插入数据库。一个
面向块(从 Step Factory Bean 创建)允许用户实现此用途
case 中带有一个简单的 for 错误 on 和一个 for
上的错误。以下代码片段说明了一个侦听器,该侦听器将两个 read
和写入失败:Step
ItemReadListener
read
ItemWriteListener
write
public class ItemFailureLoggerListener extends ItemListenerSupport {
private static Log logger = LogFactory.getLog("item.error");
public void onReadError(Exception ex) {
logger.error("Encountered error on read", e);
}
public void onWriteError(Exception ex, List<? extends Object> items) {
logger.error("Encountered error on write", ex);
}
}
实施此侦听器后,必须将其注册到一个步骤中。
以下示例演示如何在 XML 中注册一个步骤的侦听器:
<step id="simpleStep">
...
<listeners>
<listener>
<bean class="org.example...ItemFailureLoggerListener"/>
</listener>
</listeners>
</step>
以下示例显示了如何向步骤 Java 注册侦听器:
@Bean
public Step simpleStep(JobRepository jobRepository) {
return new StepBuilder("simpleStep", jobRepository)
...
.listener(new ItemFailureLoggerListener())
.build();
}
如果你的侦听器在方法中执行任何操作,它必须位于
将要回滚的事务。如果您需要使用事务性
资源(例如数据库)中,请考虑添加声明性
transaction 传递给该方法(有关详细信息,请参见 Spring Core 参考指南),并为其
propagation 属性的值为 .onError() onError() REQUIRES_NEW |
出于业务原因手动停止作业
Spring Batch 通过接口提供了一种方法,但这是
真正供操作员使用,而不是供应用程序程序员使用。有时,它是
从企业内部停止任务执行更方便或更有意义
逻辑。stop()
JobOperator
最简单的方法是抛出一个 (一个既没有重试的
无限期地跳过)。例如,可以使用自定义异常类型,如下所示
在以下示例中:RuntimeException
public class PoisonPillItemProcessor<T> implements ItemProcessor<T, T> {
@Override
public T process(T item) throws Exception {
if (isPoisonPill(item)) {
throw new PoisonPillException("Poison pill detected: " + item);
}
return item;
}
}
阻止步骤执行的另一种简单方法是从 返回 ,如以下示例所示:null
ItemReader
public class EarlyCompletionItemReader implements ItemReader<T> {
private ItemReader<T> delegate;
public void setDelegate(ItemReader<T> delegate) { ... }
public T read() throws Exception {
T item = delegate.read();
if (isEndItem(item)) {
return null; // end the step here
}
return item;
}
}
前面的示例实际上依赖于存在默认实现的事实
的策略,当项目要
processed 是 。可以实施更复杂的完成策略,并且
注入 通过 .CompletionPolicy
null
Step
SimpleStepFactoryBean
以下示例显示了如何将完成策略注入到 XML 中的步骤中:
<step id="simpleStep">
<tasklet>
<chunk reader="reader" writer="writer" commit-interval="10"
chunk-completion-policy="completionPolicy"/>
</tasklet>
</step>
<bean id="completionPolicy" class="org.example...SpecialCompletionPolicy"/>
以下示例显示了如何将完成策略注入 Java 中的步骤:
@Bean
public Step simpleStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("simpleStep", jobRepository)
.<String, String>chunk(new SpecialCompletionPolicy(), transactionManager)
.reader(reader())
.writer(writer())
.build();
}
另一种方法是在 中设置一个标志,该标志由框架中的实现在项目处理之间进行检查。实现这个
或者,我们需要访问当前的 ,这可以通过
实现 a 并将其注册到 .以下示例
显示设置标志的侦听器:StepExecution
Step
StepExecution
StepListener
Step
public class CustomItemWriter extends ItemListenerSupport implements StepListener {
private StepExecution stepExecution;
public void beforeStep(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
public void afterRead(Object item) {
if (isPoisonPill(item)) {
stepExecution.setTerminateOnly();
}
}
}
设置标志后,默认行为是步骤引发 .此行为可以通过 .但是,唯一的选择是抛出或不抛出异常
所以这总是一个不正常的工作结局。JobInterruptedException
StepInterruptionPolicy
添加页脚记录
通常,在写入平面文件时,必须在
文件,在所有处理完成后。这可以使用 Spring Batch 提供的接口来实现。(及其对应的 ) 是 的可选属性,可以添加到项编写器中。FlatFileFooterCallback
FlatFileFooterCallback
FlatFileHeaderCallback
FlatFileItemWriter
下面的示例演示如何在 XML 中使用 和 :FlatFileHeaderCallback
FlatFileFooterCallback
<bean id="itemWriter" class="org.spr...FlatFileItemWriter">
<property name="resource" ref="outputResource" />
<property name="lineAggregator" ref="lineAggregator"/>
<property name="headerCallback" ref="headerCallback" />
<property name="footerCallback" ref="footerCallback" />
</bean>
下面的示例展示了如何在 Java 中使用 和 :FlatFileHeaderCallback
FlatFileFooterCallback
@Bean
public FlatFileItemWriter<String> itemWriter(Resource outputResource) {
return new FlatFileItemWriterBuilder<String>()
.name("itemWriter")
.resource(outputResource)
.lineAggregator(lineAggregator())
.headerCallback(headerCallback())
.footerCallback(footerCallback())
.build();
}
页脚回调接口只有一个方法,当页脚必须为 写入,如以下接口定义所示:
public interface FlatFileFooterCallback {
void writeFooter(Writer writer) throws IOException;
}
编写摘要页脚
涉及页脚记录的一个常见要求是在 output 进程,并将此信息附加到文件末尾。此页脚通常 用作文件的摘要或提供校验和。
例如,如果批处理作业正在将记录写入平面文件,并且存在
要求将所有 the 中的总金额放在页脚中,则
可以使用以下实现:Trade
Trades
ItemWriter
public class TradeItemWriter implements ItemWriter<Trade>,
FlatFileFooterCallback {
private ItemWriter<Trade> delegate;
private BigDecimal totalAmount = BigDecimal.ZERO;
public void write(Chunk<? extends Trade> items) throws Exception {
BigDecimal chunkTotal = BigDecimal.ZERO;
for (Trade trade : items) {
chunkTotal = chunkTotal.add(trade.getAmount());
}
delegate.write(items);
// After successfully writing all items
totalAmount = totalAmount.add(chunkTotal);
}
public void writeFooter(Writer writer) throws IOException {
writer.write("Total Amount Processed: " + totalAmount);
}
public void setDelegate(ItemWriter delegate) {...}
}
这将存储一个值,该值会随着写入的每个项目而增加。处理完最后一个后,框架调用 ,这会将 放入文件中。请注意,方法
使用临时变量 ,该变量存储块中 amount 的总数。这样做是为了确保,如果方法中发生 skip,则 保持不变。只有在方法结束时,一旦我们保证不会引发异常,我们才会更新 .TradeItemWriter
totalAmount
amount
Trade
Trade
writeFooter
totalAmount
write
chunkTotal
Trade
write
totalAmount
write
totalAmount
为了调用该方法,将 (which
implements ) 必须作为 .writeFooter
TradeItemWriter
FlatFileFooterCallback
FlatFileItemWriter
footerCallback
下面的示例演示如何在 XML 中连接 :TradeItemWriter
<bean id="tradeItemWriter" class="..TradeItemWriter">
<property name="delegate" ref="flatFileItemWriter" />
</bean>
<bean id="flatFileItemWriter" class="org.spr...FlatFileItemWriter">
<property name="resource" ref="outputResource" />
<property name="lineAggregator" ref="lineAggregator"/>
<property name="footerCallback" ref="tradeItemWriter" />
</bean>
下面的示例展示了如何在 Java 中连接 :TradeItemWriter
@Bean
public TradeItemWriter tradeItemWriter() {
TradeItemWriter itemWriter = new TradeItemWriter();
itemWriter.setDelegate(flatFileItemWriter(null));
return itemWriter;
}
@Bean
public FlatFileItemWriter<String> flatFileItemWriter(Resource outputResource) {
return new FlatFileItemWriterBuilder<String>()
.name("itemWriter")
.resource(outputResource)
.lineAggregator(lineAggregator())
.footerCallback(tradeItemWriter())
.build();
}
到目前为止,编写 the 的方式只有在
是不可重新启动的。这是因为该类是有状态的(因为它存储了 ),但 没有持久化到数据库中。因此,它
在重新启动时无法检索。为了使此类可重启,
该接口应与方法 和 一起实现,如以下示例所示:TradeItemWriter
Step
totalAmount
totalAmount
ItemStream
open
update
public void open(ExecutionContext executionContext) {
if (executionContext.containsKey("total.amount") {
totalAmount = (BigDecimal) executionContext.get("total.amount");
}
}
public void update(ExecutionContext executionContext) {
executionContext.put("total.amount", totalAmount);
}
update 方法将 的最新版本存储到该对象保存到数据库之前。open 方法
从 中检索任何现有的 ,并将其用作
处理的起点,允许在重新启动时拾取
它在上次运行 时停止。totalAmount
ExecutionContext
totalAmount
ExecutionContext
TradeItemWriter
Step
驱动基于查询的 ItemReader
在关于 reader 和 writer 的章节中,使用 讨论了分页。许多数据库供应商(比如 DB2)都非常悲观 如果正在读取的表也需要由 在线应用程序的其他部分。此外,打开光标 大型数据集可能会导致某些供应商的数据库出现问题。因此,许多 项目更喜欢使用 'Driving Query' 方法来读取数据。这种方法有效 通过迭代 key,而不是需要返回的整个对象,作为 下图说明:
如您所见,上图中显示的示例使用与原来的 'FOO' 表相同的
在基于游标的示例中使用。但是,不是选择整行,而是只选择
在 SQL 语句中选择了 ID。因此,而不是返回对象
from ,则返回 an。然后,此数字可用于查询
'details' ,这是一个完整的对象,如下图所示:FOO
read
Integer
Foo
应使用 An 来转换从 driving 查询中获取的 key
转换为一个完整的对象。现有的 DAO 可用于查询基于完整对象的
在键上。ItemProcessor
Foo
多行记录
虽然平面文件通常每个记录都局限于单个 行,则文件可能具有跨多行且多个 格式。以下文件摘录显示了这种安排的示例:
HEA;0013100345;2007-02-15 NCU;Smith;Peter;;T;20014539;F BAD;;Oak Street 31/A;;Small Town;00235;IL;US FOT;2;2;267.34
以 'HEA' 开头的行和以 'FOT' 开头的行之间的所有内容都是 被视为 1 条记录。为了 正确处理这种情况:
-
必须读取 multi line record 作为一个组,以便可以将其传递给 intact。
ItemReader
ItemWriter
-
每种行类型可能需要以不同的方式进行标记。
因为一条记录跨越多行,而且我们可能不知道有多少行
有,必须小心始终阅读整个记录。为了
执行此操作时,应将自定义实现为 .ItemReader
ItemReader
FlatFileItemReader
以下示例演示如何在 XML 中实现自定义:ItemReader
<bean id="itemReader" class="org.spr...MultiLineTradeItemReader">
<property name="delegate">
<bean class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource" value="data/iosample/input/multiLine.txt" />
<property name="lineMapper">
<bean class="org.spr...DefaultLineMapper">
<property name="lineTokenizer" ref="orderFileTokenizer"/>
<property name="fieldSetMapper" ref="orderFieldSetMapper"/>
</bean>
</property>
</bean>
</property>
</bean>
以下示例演示如何在 Java 中实现自定义:ItemReader
@Bean
public MultiLineTradeItemReader itemReader() {
MultiLineTradeItemReader itemReader = new MultiLineTradeItemReader();
itemReader.setDelegate(flatFileItemReader());
return itemReader;
}
@Bean
public FlatFileItemReader flatFileItemReader() {
FlatFileItemReader<Trade> reader = new FlatFileItemReaderBuilder<>()
.name("flatFileItemReader")
.resource(new ClassPathResource("data/iosample/input/multiLine.txt"))
.lineTokenizer(orderFileTokenizer())
.fieldSetMapper(orderFieldSetMapper())
.build();
return reader;
}
确保每一行都正确地标记化,这对于
fixed-length input,则可以在
委托。请参阅 Reader 中的 FlatFileItemReader
和
作家章节了解更多详情。然后,委托读者使用 a 将 for each line 传递回换行。PatternMatchingCompositeLineTokenizer
FlatFileItemReader
PassThroughFieldSetMapper
FieldSet
ItemReader
以下示例说明如何确保在 XML 中正确标记每一行:
<bean id="orderFileTokenizer" class="org.spr...PatternMatchingCompositeLineTokenizer">
<property name="tokenizers">
<map>
<entry key="HEA*" value-ref="headerRecordTokenizer" />
<entry key="FOT*" value-ref="footerRecordTokenizer" />
<entry key="NCU*" value-ref="customerLineTokenizer" />
<entry key="BAD*" value-ref="billingAddressLineTokenizer" />
</map>
</property>
</bean>
以下示例说明如何确保每 X 行在 Java 中正确标记化:
@Bean
public PatternMatchingCompositeLineTokenizer orderFileTokenizer() {
PatternMatchingCompositeLineTokenizer tokenizer =
new PatternMatchingCompositeLineTokenizer();
Map<String, LineTokenizer> tokenizers = new HashMap<>(4);
tokenizers.put("HEA*", headerRecordTokenizer());
tokenizers.put("FOT*", footerRecordTokenizer());
tokenizers.put("NCU*", customerLineTokenizer());
tokenizers.put("BAD*", billingAddressLineTokenizer());
tokenizer.setTokenizers(tokenizers);
return tokenizer;
}
这个包装器必须能够识别记录的结尾,这样它才能持续地
调用,直到到达末尾。对于读取的每一行,
wrapper 构建要返回的项目。到达页脚后,项目可以
返回以传送到 和 ,如
以下示例:read()
ItemProcessor
ItemWriter
private FlatFileItemReader<FieldSet> delegate;
public Trade read() throws Exception {
Trade t = null;
for (FieldSet line = null; (line = this.delegate.read()) != null;) {
String prefix = line.readString(0);
if (prefix.equals("HEA")) {
t = new Trade(); // Record must start with header
}
else if (prefix.equals("NCU")) {
Assert.notNull(t, "No header was found.");
t.setLast(line.readString(1));
t.setFirst(line.readString(2));
...
}
else if (prefix.equals("BAD")) {
Assert.notNull(t, "No header was found.");
t.setCity(line.readString(4));
t.setState(line.readString(6));
...
}
else if (prefix.equals("FOT")) {
return t; // Record must end with footer
}
}
Assert.isNull(t, "No 'END' was found.");
return null;
}
执行系统命令
许多批处理作业要求从批处理作业中调用外部命令。 这样的过程可以由调度程序单独启动,但 有关运行的常见元数据将丢失。此外,多步骤作业也将 还需要拆分为多个 Job。
由于这种需求非常普遍,因此 Spring Batch 提供了
调用 system 命令。Tasklet
以下示例演示如何在 XML 中调用外部命令:
<bean class="org.springframework.batch.core.step.tasklet.SystemCommandTasklet">
<property name="command" value="echo hello" />
<!-- 5 second timeout for the command to complete -->
<property name="timeout" value="5000" />
</bean>
以下示例演示如何在 Java 中调用外部命令:
@Bean
public SystemCommandTasklet tasklet() {
SystemCommandTasklet tasklet = new SystemCommandTasklet();
tasklet.setCommand("echo hello");
tasklet.setTimeout(5000);
return tasklet;
}
在未找到输入时处理步骤完成
在许多批处理方案中,在数据库或文件中找不到要处理的行不是
特殊。简单地认为 没有找到任何工作,并以 0 完成
items 已读取。Spring 中提供的所有实现都是开箱即用的
Batch 默认使用此方法。如果未写出任何内容,这可能会导致一些混淆
即使存在输入(如果文件命名错误或类似名称,通常会发生这种情况
问题出现)。因此,应检查元数据本身以确定如何
框架发现很多工作要处理。但是,如果未找到输入
被认为是例外的?在这种情况下,以编程方式检查元数据中是否有项目
处理并导致失败是最好的解决方案。因为这是一个常见的用例,
Spring Batch 提供了具有此功能的侦听器,如
的类定义 :Step
ItemReader
NoWorkFoundStepExecutionListener
public class NoWorkFoundStepExecutionListener extends StepExecutionListenerSupport {
public ExitStatus afterStep(StepExecution stepExecution) {
if (stepExecution.getReadCount() == 0) {
return ExitStatus.FAILED;
}
return null;
}
}
前面的代码在 'afterStep' 阶段检查 the 的属性,以确定是否未读取任何项目。如果那
,则返回退出代码,指示 应该失败。
否则,将返回 .StepExecutionListener
readCount
StepExecution
FAILED
Step
null
Step
将数据传递给 Future Steps
将信息从一个步骤传递到另一个步骤通常很有用。这可以通过以下方式完成
这。问题是有两个:一个在级别,一个在级别。仅保留为
长为步长,而 仍贯穿整个 .上
另一方面,每次提交
块,而 则仅在每个 .ExecutionContext
ExecutionContexts
Step
Job
Step
ExecutionContext
Job
ExecutionContext
Job
Step
ExecutionContext
Step
Job
ExecutionContext
Step
这种分离的结果是,所有数据都必须放在 执行 时。这样做可确保数据
在运行时妥善存放。如果数据存储到 ,
,则在执行过程中不会保留该 ID。如果失败,则该数据将丢失。Step
ExecutionContext
Step
Step
Job
ExecutionContext
Step
Step
public class SavingItemWriter implements ItemWriter<Object> {
private StepExecution stepExecution;
public void write(Chunk<? extends Object> items) throws Exception {
// ...
ExecutionContext stepContext = this.stepExecution.getExecutionContext();
stepContext.put("someKey", someObject);
}
@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
}
要使数据可用于 future ,必须在步骤完成后将其“提升”到 。Spring Batch 为此提供了。必须配置 Listener
与必须提升的数据相关的键。它可以
此外,还可以选择配置一个退出代码模式列表,该列表的升级
应该发生( 是默认值)。与所有侦听器一样,它必须注册
在 .Steps
Job
ExecutionContext
ExecutionContextPromotionListener
ExecutionContext
COMPLETED
Step
以下示例演示如何将步骤提升为 in XML:Job
ExecutionContext
<job id="job1">
<step id="step1">
<tasklet>
<chunk reader="reader" writer="savingWriter" commit-interval="10"/>
</tasklet>
<listeners>
<listener ref="promotionListener"/>
</listeners>
</step>
<step id="step2">
...
</step>
</job>
<beans:bean id="promotionListener" class="org.spr....ExecutionContextPromotionListener">
<beans:property name="keys">
<list>
<value>someKey</value>
</list>
</beans:property>
</beans:bean>
以下示例演示如何将步骤提升为 in Java:Job
ExecutionContext
@Bean
public Job job1(JobRepository jobRepository) {
return new JobBuilder("job1", jobRepository)
.start(step1())
.next(step1())
.build();
}
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(reader())
.writer(savingWriter())
.listener(promotionListener())
.build();
}
@Bean
public ExecutionContextPromotionListener promotionListener() {
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys(new String[] {"someKey"});
return listener;
}
最后,必须从 中检索保存的值,如下所示
在以下示例中:Job
ExecutionContext
public class RetrievingItemWriter implements ItemWriter<Object> {
private Object someObject;
public void write(Chunk<? extends Object> items) throws Exception {
// ...
}
@BeforeStep
public void retrieveInterstepData(StepExecution stepExecution) {
JobExecution jobExecution = stepExecution.getJobExecution();
ExecutionContext jobContext = jobExecution.getExecutionContext();
this.someObject = jobContext.get("someKey");
}
}