创建自定义 ItemReader 和 ItemWriters

到目前为止,本章已经讨论了 Spring 中读写的基本契约 Batch 和一些常见的实现来实现。然而,这些都是公平的 generic,并且有许多可能的情况不是开箱即用的 实现。本节通过一个简单的示例来展示如何创建自定义和实现并正确实现它们的 Contract。还实现了 ,以说明如何使 reader 或 writer 可重启。ItemReaderItemWriterItemReaderItemStreamspring-doc.cn

自定义示例ItemReader

对于此示例,我们创建了一个简单的实现,该 从提供的列表中读取。我们首先实现最基本的 Contract of ,即 方法,如下面的代码所示:ItemReaderItemReaderreadspring-doc.cn

public class CustomItemReader<T> implements ItemReader<T> {

    List<T> items;

    public CustomItemReader(List<T> items) {
        this.items = items;
    }

    public T read() throws Exception, UnexpectedInputException,
       NonTransientResourceException, ParseException {

        if (!items.isEmpty()) {
            return items.remove(0);
        }
        return null;
    }
}

前面的类采用一个项目列表,并一次返回一个项目,并删除每个项目 从列表中。当列表为空时,它返回 ,从而满足最基本的 的要求,如以下测试代码所示:nullItemReaderspring-doc.cn

List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");

ItemReader itemReader = new CustomItemReader<>(items);
assertEquals("1", itemReader.read());
assertEquals("2", itemReader.read());
assertEquals("3", itemReader.read());
assertNull(itemReader.read());

使 RestartableItemReader

最后一个挑战是使 可重启。目前,如果处理是 interrupted and begin again,则必须从头开始。这是 实际上在许多情况下都有效,但有时最好使用批处理作业 从上次中断处重新开始。关键的判别因素通常是 Reader 是否是有状态的 或无状态。无状态读取器不需要担心可重启性,但 Stateful 必须尝试在 restart 时重建其最后一个已知状态。因此, 我们建议您尽可能保持自定义读取器无状态,因此您不必担心 关于可重启性。ItemReaderItemReaderspring-doc.cn

如果您确实需要存储 state,则应使用该接口:ItemStreamspring-doc.cn

public class CustomItemReader<T> implements ItemReader<T>, ItemStream {

    List<T> items;
    int currentIndex = 0;
    private static final String CURRENT_INDEX = "current.index";

    public CustomItemReader(List<T> items) {
        this.items = items;
    }

    public T read() throws Exception, UnexpectedInputException,
        ParseException, NonTransientResourceException {

        if (currentIndex < items.size()) {
            return items.get(currentIndex++);
        }

        return null;
    }

    public void open(ExecutionContext executionContext) throws ItemStreamException {
        if (executionContext.containsKey(CURRENT_INDEX)) {
            currentIndex = new Long(executionContext.getLong(CURRENT_INDEX)).intValue();
        }
        else {
            currentIndex = 0;
        }
    }

    public void update(ExecutionContext executionContext) throws ItemStreamException {
        executionContext.putLong(CURRENT_INDEX, new Long(currentIndex).longValue());
    }

    public void close() throws ItemStreamException {}
}

每次调用该方法时,当前索引都存储在提供的 'current.index' 键中。调用该方法时,会检查 the 以查看它是否 包含具有该键的条目。如果找到该键,则当前索引将移动到 那个位置。这是一个相当微不足道的示例,但它仍然符合通用合同:ItemStreamupdateItemReaderExecutionContextItemStreamopenExecutionContextspring-doc.cn

ExecutionContext executionContext = new ExecutionContext();
((ItemStream)itemReader).open(executionContext);
assertEquals("1", itemReader.read());
((ItemStream)itemReader).update(executionContext);

List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");
itemReader = new CustomItemReader<>(items);

((ItemStream)itemReader).open(executionContext);
assertEquals("2", itemReader.read());

大多数具有更复杂的重启逻辑。例如,将最后处理的行的行 ID 存储在 光标。ItemReadersJdbcCursorItemReaderspring-doc.cn

还值得注意的是,中使用的 key 不应该是 琐碎。那是因为 same 用于 一个。在大多数情况下,只需在键前加上类名就足够了 以保证唯一性。但是,在极少数情况下,在同一步骤中使用两个相同类型的 (如果需要两个文件 output),则需要更独特的名称。出于这个原因,许多 Spring Batch 和实现都有一个属性,该属性允许 this key name 被覆盖。ExecutionContextExecutionContextItemStreamsStepItemStreamItemReaderItemWritersetName()spring-doc.cn

自定义示例ItemWriter

实现自定义在许多方面与示例相似 但差异足以证明它自己的例子。但是,添加 可重启性本质上是相同的,因此本例不涉及。与示例一样,使用 a 是为了使示例尽可能简单,例如 可能:ItemWriterItemReaderItemReaderListspring-doc.cn

public class CustomItemWriter<T> implements ItemWriter<T> {

    List<T> output = TransactionAwareProxyFactory.createTransactionalList();

    public void write(Chunk<? extends T> items) throws Exception {
        output.addAll(items);
    }

    public List<T> getOutput() {
        return output;
    }
}

使 RestartableItemWriter

要使 Restartable,我们将遵循与 相同的过程,添加并实现接口以同步 执行上下文。在此示例中,我们可能必须计算处理的项目数 并将其添加为页脚记录。如果需要这样做,我们可以在我们的 so 以便从执行中重新构造计数器 context 的 URL 中。ItemWriterItemReaderItemStreamItemStreamItemWriterspring-doc.cn

在许多实际情况下,custom 还会委托给另一个编写器,该编写器本身 是可重新启动的(例如,在写入文件时),否则它会写入 事务资源,因此不需要重新启动,因为它是无状态的。 当你有一个有状态的 writer 时,你可能应该确保实现为 以及 。还要记住,作者的客户端需要注意 ,因此您可能需要在配置中将其注册为流。ItemWritersItemStreamItemWriterItemStreamspring-doc.cn