创建自定义 ItemReader 和 ItemWriters
到目前为止,本章已经讨论了 Spring 中读写的基本契约
Batch 和一些常见的实现来实现。然而,这些都是公平的
generic,并且有许多可能的情况不是开箱即用的
实现。本节使用一个简单的示例来说明如何创建自定义ItemReader
和ItemWriter
implementation 并正确实现他们的 Contract。这ItemReader
还实现了ItemStream
,为了说明如何使 Reader 或
writer 可重启。
习惯ItemReader
例
在此示例中,我们创建了一个简单的ItemReader
实现
从提供的列表中读取。我们从实现最基本的 Contract 开始ItemReader
这read
方法,如以下代码所示:
public class CustomItemReader<T> implements ItemReader<T> {
List<T> items;
public CustomItemReader(List<T> items) {
this.items = items;
}
public T read() throws Exception, UnexpectedInputException,
NonTransientResourceException, ParseException {
if (!items.isEmpty()) {
return items.remove(0);
}
return null;
}
}
前面的类采用一个项目列表,并一次返回一个项目,并删除每个项目
从列表中。当列表为空时,它会返回null
,从而满足最基本的
的要求ItemReader
,如以下测试代码所示:
List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");
ItemReader itemReader = new CustomItemReader<>(items);
assertEquals("1", itemReader.read());
assertEquals("2", itemReader.read());
assertEquals("3", itemReader.read());
assertNull(itemReader.read());
使ItemReader
可重启
最后的挑战是使ItemReader
可重启。目前,如果处理是
interrupted 并重新开始,则ItemReader
必须从头开始。这是
实际上在许多情况下都有效,但有时最好使用批处理作业
从上次中断处重新开始。关键的判别因素通常是 Reader 是否是有状态的
或无状态。无状态读取器不需要担心可重启性,但
Stateful 必须尝试在 restart 时重建其最后一个已知状态。因此,
我们建议您尽可能保持自定义读取器无状态,因此您不必担心
关于可重启性。
如果您确实需要存储 state,则ItemStream
interface 应该使用:
public class CustomItemReader<T> implements ItemReader<T>, ItemStream {
List<T> items;
int currentIndex = 0;
private static final String CURRENT_INDEX = "current.index";
public CustomItemReader(List<T> items) {
this.items = items;
}
public T read() throws Exception, UnexpectedInputException,
ParseException, NonTransientResourceException {
if (currentIndex < items.size()) {
return items.get(currentIndex++);
}
return null;
}
public void open(ExecutionContext executionContext) throws ItemStreamException {
if (executionContext.containsKey(CURRENT_INDEX)) {
currentIndex = new Long(executionContext.getLong(CURRENT_INDEX)).intValue();
}
else {
currentIndex = 0;
}
}
public void update(ExecutionContext executionContext) throws ItemStreamException {
executionContext.putLong(CURRENT_INDEX, new Long(currentIndex).longValue());
}
public void close() throws ItemStreamException {}
}
每次调用ItemStream
update
method,则ItemReader
存储在提供的ExecutionContext
键为 'current.index'。当ItemStream
open
方法时,调用ExecutionContext
检查以查看它是否
包含具有该键的条目。如果找到该键,则当前索引将移动到
那个位置。这是一个相当微不足道的示例,但它仍然符合通用合同:
ExecutionContext executionContext = new ExecutionContext();
((ItemStream)itemReader).open(executionContext);
assertEquals("1", itemReader.read());
((ItemStream)itemReader).update(executionContext);
List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");
itemReader = new CustomItemReader<>(items);
((ItemStream)itemReader).open(executionContext);
assertEquals("2", itemReader.read());
最ItemReaders
具有更复杂的重启逻辑。这JdbcCursorItemReader
,例如,将最后处理的行的行 ID 存储在
光标。
还值得注意的是,在ExecutionContext
不应为
琐碎。那是因为相同的ExecutionContext
用于所有ItemStreams
在
一个Step
.在大多数情况下,只需在键前加上类名就足够了
以保证唯一性。但是,在极少数情况下,两个相同类型的ItemStream
用于同一步骤(如果需要两个文件
output),则需要更独特的名称。出于这个原因,许多 Spring BatchItemReader
和ItemWriter
实现具有setName()
属性,允许此
key name 被覆盖。
习惯ItemWriter
例
实现自定义ItemWriter
在许多方面与ItemReader
例
但差异足以证明它自己的例子。但是,添加
可重启性本质上是相同的,因此本例不涉及。与ItemReader
example,一个List
是为了使示例尽可能简单
可能:
public class CustomItemWriter<T> implements ItemWriter<T> {
List<T> output = TransactionAwareProxyFactory.createTransactionalList();
public void write(Chunk<? extends T> items) throws Exception {
output.addAll(items);
}
public List<T> getOutput() {
return output;
}
}
使ItemWriter
可重启
要使ItemWriter
重新启动的,我们将遵循与ItemReader
,添加并实现ItemStream
接口同步
执行上下文。在此示例中,我们可能必须计算处理的项目数
并将其添加为页脚记录。如果需要这样做,我们可以实施ItemStream
在我们的ItemWriter
以便 counter 从执行中重建
context 的 URL 中。
在许多实际情况下,自定义ItemWriters
也委托给另一个编写器,该编写器本身
是可重新启动的(例如,在写入文件时),否则它会写入
事务资源,因此不需要重新启动,因为它是无状态的。
当你有一个有状态的 writer 时,你可能应该确保实现ItemStream
如
以及ItemWriter
.还要记住,作者的客户端需要注意
这ItemStream
,因此您可能需要在配置中将其注册为流。