扩展和并行处理

许多批处理问题可以通过单线程、单进程作业来解决。 因此,在考虑之前正确检查这是否满足您的需求总是一个好主意 关于更复杂的实现。衡量实际作业的性能,并查看 最简单的实现首先满足您的需求。您可以读取和写入 几百兆字节的存储时间远远超过一分钟,即使使用标准硬件也是如此。spring-doc.cadn.net.cn

当您准备好开始通过一些并行处理实现作业时, Spring Batch 提供了一系列选项,本章将介绍这些选项,尽管一些 功能详见其他专题。在高层次上,有两种并行模式 加工:spring-doc.cadn.net.cn

这些也分为以下几类:spring-doc.cadn.net.cn

首先,我们回顾一下单进程选项。然后,我们回顾一下多进程选项。spring-doc.cadn.net.cn

多线程步骤

启动并行处理的最简单方法是添加TaskExecutor到您的步骤 配置。spring-doc.cadn.net.cn

使用 Java 配置时,您可以添加TaskExecutor到步骤中, 如下例所示:spring-doc.cadn.net.cn

Java 配置
@Bean
public TaskExecutor taskExecutor() {
    return new SimpleAsyncTaskExecutor("spring_batch");
}

@Bean
public Step sampleStep(TaskExecutor taskExecutor, JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("sampleStep", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(itemReader())
				.writer(itemWriter())
				.taskExecutor(taskExecutor)
				.build();
}

例如,您可以将属性 TOtasklet如下:spring-doc.cadn.net.cn

<step id="loading">
    <tasklet task-executor="taskExecutor">...</tasklet>
</step>

在此示例中,taskExecutor是对另一个 bean 定义的引用,该 实现TaskExecutor接口。TaskExecutor是标准的 Spring 接口,因此请参阅 Spring 用户指南以了解可用的详细信息 实现。最简单的多线程TaskExecutor是一个SimpleAsyncTaskExecutor.spring-doc.cadn.net.cn

上述配置的结果是Step通过读取、处理、 并在单独的执行线程中写入每个项目块(每个提交间隔)。 请注意,这意味着要处理的项目没有固定的顺序,并且 chunk 可能包含与单线程大小写相比不连续的项目。在 除了任务执行程序设置的任何限制(例如,它是否由 thread pool),则 tasklet 配置具有限制(默认值:4)。 您可能需要增加此限制以确保线程池得到充分利用。spring-doc.cadn.net.cn

使用 Java 配置时,构建器提供对限制的访问,如 遵循:spring-doc.cadn.net.cn

Java 配置
@Bean
public Step sampleStep(TaskExecutor taskExecutor, JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("sampleStep", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(itemReader())
				.writer(itemWriter())
				.taskExecutor(taskExecutor)
				.throttleLimit(20)
				.build();
}

例如,您可以增加 throttle-limit,如下所示:spring-doc.cadn.net.cn

<step id="loading"> <tasklet
    task-executor="taskExecutor"
    throttle-limit="20">...</tasklet>
</step>

另请注意,在 您的步骤,例如DataSource.请确保至少在这些资源中创建池 与步骤中所需的并发线程数一样大。spring-doc.cadn.net.cn

限制限制弃用

从 v5.0 开始,限制已弃用,没有替代品。如果要将 当前限制机制在默认TaskExecutorRepeatTemplate,您需要提供 自定义RepeatOperations实现(基于TaskExecutor具有有界任务队列) 并将其设置在步骤中StepBuilder#stepOperations:spring-doc.cadn.net.cn

Java 配置
@Bean
public Step sampleStep(RepeatOperations customRepeatOperations, JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("sampleStep", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(itemReader())
				.writer(itemWriter())
				.stepOperations(customRepeatOperations)
				.build();
}

使用多线程有一些实际限制Step的 implementations 一些常见的 Batch 使用案例。a 中的许多参与者Step(例如 reader 和 writers) 是有状态的。如果状态没有按线程隔离,则这些组件不会 可在多线程中使用Step.特别是,大多数读者和 Spring Batch 中的写入器不是为多线程使用而设计的。然而, 可以使用无状态或线程安全的读取器和写入器,并且有一个示例 (称为parallelJob 显示使用进程指示器进行跟踪的 Batch Samples(请参阅防止状态持久性) 数据库输入表中已处理的项目数。spring-doc.cadn.net.cn

Spring Batch 提供了一些ItemWriterItemReader.通常 他们在 Javadoc 中说明了它们是否是线程安全的,或者您必须做些什么来避免 并发环境中的问题。如果 Javadoc 中没有信息,则可以 检查 implementation 以查看是否有任何状态。如果读取器不是线程安全的, 您可以使用提供的SynchronizedItemStreamReader或者在您自己的 同步委托人。您可以将调用同步到read(),并且只要 处理和写入是 chunk 中最昂贵的部分,您的步骤可能仍会 完成速度比在单线程配置中快得多。spring-doc.cadn.net.cn

并行步骤

只要需要并行化的应用程序逻辑可以拆分为不同的 职责并分配给各个步骤,则可以在 单个进程。并行步骤执行易于配置和使用。spring-doc.cadn.net.cn

使用 Java 配置时,执行步骤(step1,step2)step3很简单,如下所示:spring-doc.cadn.net.cn

Java 配置
@Bean
public Job job(JobRepository jobRepository) {
    return new JobBuilder("job", jobRepository)
        .start(splitFlow())
        .next(step4())
        .build()        //builds FlowJobBuilder instance
        .build();       //builds Job instance
}

@Bean
public Flow splitFlow() {
    return new FlowBuilder<SimpleFlow>("splitFlow")
        .split(taskExecutor())
        .add(flow1(), flow2())
        .build();
}

@Bean
public Flow flow1() {
    return new FlowBuilder<SimpleFlow>("flow1")
        .start(step1())
        .next(step2())
        .build();
}

@Bean
public Flow flow2() {
    return new FlowBuilder<SimpleFlow>("flow2")
        .start(step3())
        .build();
}

@Bean
public TaskExecutor taskExecutor() {
    return new SimpleAsyncTaskExecutor("spring_batch");
}

例如,执行步骤(step1,step2)step3很简单, 如下:spring-doc.cadn.net.cn

<job id="job1">
    <split id="split1" task-executor="taskExecutor" next="step4">
        <flow>
            <step id="step1" parent="s1" next="step2"/>
            <step id="step2" parent="s2"/>
        </flow>
        <flow>
            <step id="step3" parent="s3"/>
        </flow>
    </split>
    <step id="step4" parent="s4"/>
</job>

<beans:bean id="taskExecutor" class="org.spr...SimpleAsyncTaskExecutor"/>

可配置的任务执行程序用于指定TaskExecutorimplementation 应执行各个流。默认值为SyncTaskExecutor,但是异步的TaskExecutor运行中的步骤 平行。请注意,该作业可确保拆分中的每个流在 聚合退出状态并进行转换。spring-doc.cadn.net.cn

请参阅 拆分流 以了解更多详细信息。spring-doc.cadn.net.cn

远程分块

在远程分块中,Step处理被拆分到多个进程中, 通过一些中间件相互通信。下图显示了 模式:spring-doc.cadn.net.cn

远程分块
图 1.远程分块

管理器组件是单个进程,而 worker 是多个远程进程。 如果管理器不是瓶颈,则此模式效果最佳,因此处理必须更多 比读取项目更昂贵(在实践中经常出现这种情况)。spring-doc.cadn.net.cn

管理器是 Spring Batch 的实现Step使用ItemWriter取代 通过知道如何将项目块发送到中间件的通用版本作为 消息。worker 是正在使用的任何中间件的标准侦听器(对于 例如,使用 JMS 时,它们将是MesssageListenerimplementations),它们的作用是 使用标准的ItemWriterItemProcessor加上一个ItemWriter,通过ChunkProcessor接口。使用它的好处之一 模式是 reader、processor 和 writer 组件是现成的(相同的 将用于 Step 的本地执行)。项目是动态划分的, 并且工作是通过中间件共享的,因此,如果侦听器都渴望 消费者,负载平衡是自动的。spring-doc.cadn.net.cn

中间件必须是持久的,有保证的交付,并且每个中间件都有一个使用者 消息。JMS 是显而易见的候选者,但其他选项(例如 JavaSpaces)存在于 网格计算和共享内存产品空间。spring-doc.cadn.net.cn

有关更多详细信息,请参见 Spring Batch 集成 - 远程分块 部分。spring-doc.cadn.net.cn

分区

Spring Batch 还提供了一个 SPI 用于对Step执行并执行它 远程。在这种情况下,远程参与者是Step实例,这些实例可以作为 易于配置并用于本地处理。下图显示了 模式:spring-doc.cadn.net.cn

分区概述
图 2.分区

Job作为Step实例,以及Stepinstances 被标记为 Manager。这张图片中的工人都是相同的 的实例Step,它实际上可以代替 manager,从而导致 相同的结果Job.worker 通常是远程服务,但 也可以是本地执行线程。Manager 发送给 worker 的消息 在这种模式中,不需要持久或有保证的交付。Spring Batch 元数据中的JobRepository确保每个 worker 执行一次,并且只执行一次 每Job执行。spring-doc.cadn.net.cn

Spring Batch 中的 SPI 由Step(称为PartitionStep) 和两个策略接口,这些接口需要为特定的 环境。策略界面包括PartitionHandlerStepExecutionSplitter, 下面的序列图显示了它们的作用:spring-doc.cadn.net.cn

对 SPI 进行分区
图 3.对 SPI 进行分区

Step在本例中,右侧是 “remote” worker,因此,可能有 许多对象和 OR 进程扮演这个角色,并且PartitionStep显示驾驶 执行。spring-doc.cadn.net.cn

以下示例显示了PartitionStep使用 Java 时的配置 配置:spring-doc.cadn.net.cn

Java 配置
@Bean
public Step step1Manager(JobRepository jobRepository) {
    return new StepBuilder("step1.manager", jobRepository)
        .<String, String>partitioner("step1", partitioner())
        .step(step1())
        .gridSize(10)
        .taskExecutor(taskExecutor())
        .build();
}

与多线程步骤的throttleLimit方法、gridSize方法防止 Task Executor 被来自单个 步。spring-doc.cadn.net.cn

以下示例显示了PartitionStep使用 XML 时的配置 配置:spring-doc.cadn.net.cn

<step id="step1.manager">
    <partition step="step1" partitioner="partitioner">
        <handler grid-size="10" task-executor="taskExecutor"/>
    </partition>
</step>

与多线程步骤的throttle-limit属性、grid-size属性可以防止 Task Executor 被来自单个 步。spring-doc.cadn.net.cn

Spring 的单元测试套件 Batch Samples (请参阅partition*Job.xmlconfiguration) 有一个简单的示例,您可以复制和扩展。spring-doc.cadn.net.cn

Spring Batch 为名为step1:partition0等等 上。许多人更喜欢将 manager step 称为step1:manager保持一致性。您可以 为步骤使用别名(通过指定name属性而不是id属性)。spring-doc.cadn.net.cn

PartitionHandler (分区处理程序)

PartitionHandler是了解远程处理结构的组件,或者 grid 环境。它能够发送StepExecution请求到远程Step实例,以某种特定于 Fabric 的格式包装,例如 DTO。它不必知道 如何拆分输入数据或如何聚合多个Step执行。 一般来说,它可能也不需要了解弹性或故障转移。 因为在许多情况下,这些是 Fabric 的特征。无论如何,Spring Batch 始终 提供独立于结构的可重启性。A 失败Job总是可以重新启动的, 在这种情况下,只有失败的Steps重新执行。spring-doc.cadn.net.cn

PartitionHandlerinterface 可以具有各种 结构类型,包括简单的 RMI 远程处理、EJB 远程处理、自定义 Web 服务、JMS、Java 空间、共享内存网格(例如 Terracotta 或 Coherence)和网格执行结构 (例如 GridGain)。Spring Batch 不包含任何专有网格的实现 或远程处理结构。spring-doc.cadn.net.cn

但是,Spring Batch 确实提供了PartitionHandler那 执行Step实例,使用TaskExecutor战略从Spring。该实现称为TaskExecutorPartitionHandler.spring-doc.cadn.net.cn

您可以显式配置TaskExecutorPartitionHandler使用 Java 配置, 如下:spring-doc.cadn.net.cn

Java 配置
@Bean
public Step step1Manager(JobRepository jobRepository) {
    return new StepBuilder("step1.manager", jobRepository)
        .partitioner("step1", partitioner())
        .partitionHandler(partitionHandler())
        .build();
}

@Bean
public PartitionHandler partitionHandler() {
    TaskExecutorPartitionHandler retVal = new TaskExecutorPartitionHandler();
    retVal.setTaskExecutor(taskExecutor());
    retVal.setStep(step1());
    retVal.setGridSize(10);
    return retVal;
}

TaskExecutorPartitionHandler是使用 XML 配置的步骤 命名空间。您还可以显式配置它,如下所示:spring-doc.cadn.net.cn

<step id="step1.manager">
    <partition step="step1" handler="handler"/>
</step>

<bean class="org.spr...TaskExecutorPartitionHandler">
    <property name="taskExecutor" ref="taskExecutor"/>
    <property name="step" ref="step1" />
    <property name="gridSize" value="10" />
</bean>

gridSizeattribute 确定要创建的单独步骤执行的数量,因此 它可以与TaskExecutor.或者,它 可以设置为大于可用线程数,这使得 工作更小。spring-doc.cadn.net.cn

TaskExecutorPartitionHandler适用于 IO 密集型Step实例,例如 将大量文件拷贝或将文件系统复制到 Content Management 中 系统。它还可以通过提供Step实现 这是远程调用的代理(例如使用 Spring Remoting)。spring-doc.cadn.net.cn

分区程序

Partitioner具有更简单的职责:生成执行上下文作为 input 参数仅用于新步骤执行(无需担心重启)。它有一个 single 方法,如下面的接口定义所示:spring-doc.cadn.net.cn

public interface Partitioner {
    Map<String, ExecutionContext> partition(int gridSize);
}

此方法的返回值为每个步骤执行关联一个唯一的名称(String),其中输入参数为ExecutionContext.名称显示 later 作为分区StepExecutions.这ExecutionContext只是一个名称-值对的袋子,因此它可能包含 主键、行号或输入文件的位置。遥控器Step然后 通常使用#{…​}placeholders (步骤中的后期绑定 范围),如下一节所示。spring-doc.cadn.net.cn

步骤执行的名称(Map返回者Partitioner) 需要 在Job但没有任何其他特定的 要求。执行此作(并使名称对用户有意义)的最简单方法是 使用 prefix+suffix 命名约定,其中 prefix 是 正在执行(这本身在Job),后缀只是一个 计数器。有一个SimplePartitioner在使用此约定的框架中。spring-doc.cadn.net.cn

您可以使用名为PartitionNameProvider提供分区 name 与 partition 本身分开。如果Partitioner实现此 接口,则在重新启动时仅查询名称。如果分区成本高昂, 这可能是一个有用的优化。由PartitionNameProvider必须 匹配Partitioner.spring-doc.cadn.net.cn

将输入数据绑定到步骤

它对于PartitionHandler要有 相同的配置,并且它们的输入参数在运行时从ExecutionContext.使用 Spring Batch 的 StepScope 功能很容易做到这一点 (在 Late Binding 一节中有更详细的介绍)。为 example,如果Partitioner创建ExecutionContext具有属性键的实例 叫fileName,为每个步骤调用指向不同的文件(或目录), 这Partitioneroutput 可能类似于下表的内容:spring-doc.cadn.net.cn

表 1.执行上下文的示例步骤执行名称Partitioner目标目录处理

Step Execution Name (key) (步骤执行名称 (key))spring-doc.cadn.net.cn

ExecutionContext (值)spring-doc.cadn.net.cn

文件复制:分区 0spring-doc.cadn.net.cn

文件名=/home/data/onespring-doc.cadn.net.cn

文件复制:partition1spring-doc.cadn.net.cn

文件名=/home/data/twospring-doc.cadn.net.cn

文件复制:partition2spring-doc.cadn.net.cn

文件名=/home/data/threespring-doc.cadn.net.cn

然后,可以通过使用到执行上下文的后期绑定将文件名绑定到步骤。spring-doc.cadn.net.cn

以下示例演示如何在 Java 中定义后期绑定:spring-doc.cadn.net.cn

Java 配置
@Bean
public MultiResourceItemReader itemReader(
	@Value("#{stepExecutionContext['fileName']}/*") Resource [] resources) {
	return new MultiResourceItemReaderBuilder<String>()
			.delegate(fileReader())
			.name("itemReader")
			.resources(resources)
			.build();
}

下面的示例演示如何在 XML 中定义后期绑定:spring-doc.cadn.net.cn

XML 配置
<bean id="itemReader" scope="step"
      class="org.spr...MultiResourceItemReader">
    <property name="resources" value="#{stepExecutionContext[fileName]}/*"/>
</bean>