创建自定义 ItemReader 和 ItemWriters

到目前为止,本章已经讨论了 Spring 中读写的基本契约 Batch 和一些常见的实现来实现。然而,这些都是公平的 generic,并且有许多可能的情况不是开箱即用的 实现。本节使用一个简单的示例来说明如何创建自定义ItemReaderItemWriterimplementation 并正确实现他们的 Contract。这ItemReader还实现了ItemStream,为了说明如何使 Reader 或 writer 可重启。spring-doc.cadn.net.cn

习惯ItemReader

在此示例中,我们创建了一个简单的ItemReader实现 从提供的列表中读取。我们从实现最基本的 Contract 开始ItemReaderread方法,如以下代码所示:spring-doc.cadn.net.cn

public class CustomItemReader<T> implements ItemReader<T> {

    List<T> items;

    public CustomItemReader(List<T> items) {
        this.items = items;
    }

    public T read() throws Exception, UnexpectedInputException,
       NonTransientResourceException, ParseException {

        if (!items.isEmpty()) {
            return items.remove(0);
        }
        return null;
    }
}

前面的类采用一个项目列表,并一次返回一个项目,并删除每个项目 从列表中。当列表为空时,它会返回null,从而满足最基本的 的要求ItemReader,如以下测试代码所示:spring-doc.cadn.net.cn

List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");

ItemReader itemReader = new CustomItemReader<>(items);
assertEquals("1", itemReader.read());
assertEquals("2", itemReader.read());
assertEquals("3", itemReader.read());
assertNull(itemReader.read());

使ItemReader可重启

最后的挑战是使ItemReader可重启。目前,如果处理是 interrupted 并重新开始,则ItemReader必须从头开始。这是 实际上在许多情况下都有效,但有时最好使用批处理作业 从上次中断处重新开始。关键的判别因素通常是 Reader 是否是有状态的 或无状态。无状态读取器不需要担心可重启性,但 Stateful 必须尝试在 restart 时重建其最后一个已知状态。因此, 我们建议您尽可能保持自定义读取器无状态,因此您不必担心 关于可重启性。spring-doc.cadn.net.cn

如果您确实需要存储 state,则ItemStreaminterface 应该使用:spring-doc.cadn.net.cn

public class CustomItemReader<T> implements ItemReader<T>, ItemStream {

    List<T> items;
    int currentIndex = 0;
    private static final String CURRENT_INDEX = "current.index";

    public CustomItemReader(List<T> items) {
        this.items = items;
    }

    public T read() throws Exception, UnexpectedInputException,
        ParseException, NonTransientResourceException {

        if (currentIndex < items.size()) {
            return items.get(currentIndex++);
        }

        return null;
    }

    public void open(ExecutionContext executionContext) throws ItemStreamException {
        if (executionContext.containsKey(CURRENT_INDEX)) {
            currentIndex = new Long(executionContext.getLong(CURRENT_INDEX)).intValue();
        }
        else {
            currentIndex = 0;
        }
    }

    public void update(ExecutionContext executionContext) throws ItemStreamException {
        executionContext.putLong(CURRENT_INDEX, new Long(currentIndex).longValue());
    }

    public void close() throws ItemStreamException {}
}

每次调用ItemStream updatemethod,则ItemReader存储在提供的ExecutionContext键为 'current.index'。当ItemStream open方法时,调用ExecutionContext检查以查看它是否 包含具有该键的条目。如果找到该键,则当前索引将移动到 那个位置。这是一个相当微不足道的示例,但它仍然符合通用合同:spring-doc.cadn.net.cn

ExecutionContext executionContext = new ExecutionContext();
((ItemStream)itemReader).open(executionContext);
assertEquals("1", itemReader.read());
((ItemStream)itemReader).update(executionContext);

List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");
itemReader = new CustomItemReader<>(items);

((ItemStream)itemReader).open(executionContext);
assertEquals("2", itemReader.read());

ItemReaders具有更复杂的重启逻辑。这JdbcCursorItemReader,例如,将最后处理的行的行 ID 存储在 光标。spring-doc.cadn.net.cn

还值得注意的是,在ExecutionContext不应为 琐碎。那是因为相同的ExecutionContext用于所有ItemStreams在 一个Step.在大多数情况下,只需在键前加上类名就足够了 以保证唯一性。但是,在极少数情况下,两个相同类型的ItemStream用于同一步骤(如果需要两个文件 output),则需要更独特的名称。出于这个原因,许多 Spring BatchItemReaderItemWriter实现具有setName()属性,允许此 key name 被覆盖。spring-doc.cadn.net.cn

习惯ItemWriter

实现自定义ItemWriter在许多方面与ItemReader例 但差异足以证明它自己的例子。但是,添加 可重启性本质上是相同的,因此本例不涉及。与ItemReaderexample,一个List是为了使示例尽可能简单 可能:spring-doc.cadn.net.cn

public class CustomItemWriter<T> implements ItemWriter<T> {

    List<T> output = TransactionAwareProxyFactory.createTransactionalList();

    public void write(Chunk<? extends T> items) throws Exception {
        output.addAll(items);
    }

    public List<T> getOutput() {
        return output;
    }
}

使ItemWriter可重启

要使ItemWriter重新启动的,我们将遵循与ItemReader,添加并实现ItemStream接口同步 执行上下文。在此示例中,我们可能必须计算处理的项目数 并将其添加为页脚记录。如果需要这样做,我们可以实施ItemStream在我们的ItemWriter以便 counter 从执行中重建 context 的 URL 中。spring-doc.cadn.net.cn

在许多实际情况下,自定义ItemWriters也委托给另一个编写器,该编写器本身 是可重新启动的(例如,在写入文件时),否则它会写入 事务资源,因此不需要重新启动,因为它是无状态的。 当你有一个有状态的 writer 时,你可能应该确保实现ItemStream如 以及ItemWriter.还要记住,作者的客户端需要注意 这ItemStream,因此您可能需要在配置中将其注册为流。spring-doc.cadn.net.cn