扩展和并行处理
许多批处理问题可以通过单线程、单进程作业来解决。 因此,在考虑之前正确检查这是否满足您的需求总是一个好主意 关于更复杂的实现。衡量实际作业的性能,并查看 最简单的实现首先满足您的需求。您可以读取和写入 几百兆字节的存储时间远远超过一分钟,即使使用标准硬件也是如此。
当您准备好开始通过一些并行处理实现作业时, Spring Batch 提供了一系列选项,本章将介绍这些选项,尽管一些 功能详见其他专题。在高层次上,有两种并行模式 加工:
-
单进程、多线程
-
多进程
这些也分为以下几类:
-
多线程步骤 (单进程)
-
并行步骤(单进程)
-
步骤 Remote Chunking of Step (multi-process)
-
对步骤进行分区(单进程或多进程)
首先,我们回顾一下单进程选项。然后,我们回顾一下多进程选项。
多线程步骤
启动并行处理的最简单方法是添加TaskExecutor
到您的步骤
配置。
-
Java
-
XML
使用 Java 配置时,您可以添加TaskExecutor
到台阶上,
如下例所示:
@Bean
public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor("spring_batch");
}
@Bean
public Step sampleStep(TaskExecutor taskExecutor, JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("sampleStep", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.taskExecutor(taskExecutor)
.build();
}
例如,您可以将属性 TOtasklet
如下:
<step id="loading">
<tasklet task-executor="taskExecutor">...</tasklet>
</step>
在此示例中,taskExecutor
是对另一个 bean 定义的引用,该
实现TaskExecutor
接口。TaskExecutor
是标准的 Spring 接口,因此请参阅 Spring 用户指南以了解可用的详细信息
实现。最简单的多线程TaskExecutor
是一个SimpleAsyncTaskExecutor
.
上述配置的结果是Step
通过读取、处理、
并在单独的执行线程中写入每个项目块(每个提交间隔)。
请注意,这意味着要处理的项目没有固定的顺序,并且 chunk
可能包含与单线程大小写相比不连续的项目。在
除了任务执行程序设置的任何限制(例如,它是否由
thread pool),则 tasklet 配置具有限制(默认值:4)。
您可能需要增加此限制以确保线程池得到充分利用。
-
Java
-
XML
使用 Java 配置时,构建器提供对限制的访问,如 遵循:
@Bean
public Step sampleStep(TaskExecutor taskExecutor, JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("sampleStep", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.taskExecutor(taskExecutor)
.throttleLimit(20)
.build();
}
例如,您可以增加 throttle-limit,如下所示:
<step id="loading"> <tasklet
task-executor="taskExecutor"
throttle-limit="20">...</tasklet>
</step>
另请注意,在
您的步骤,例如DataSource
.请确保至少在这些资源中创建池
与步骤中所需的并发线程数一样大。
限制限制弃用
从 v5.0 开始,限制已弃用,没有替代品。如果要将
当前限制机制在默认 Java 配置
|
使用多线程有一些实际限制Step
的 implementations
一些常见的 Batch 使用案例。a 中的许多参与者Step
(例如 reader 和 writers)
是有状态的。如果状态没有按线程隔离,则这些组件不会
可在多线程中使用Step
.特别是,大多数读者和
Spring Batch 中的写入器不是为多线程使用而设计的。然而,
可以使用无状态或线程安全的读取器和写入器,并且有一个示例
(称为parallelJob
)
显示使用进程指示器进行跟踪的 Batch Samples(请参阅防止状态持久性)
数据库输入表中已处理的项目数。
Spring Batch 提供了一些ItemWriter
和ItemReader
.通常
他们在 Javadoc 中说明了它们是否是线程安全的,或者您必须做些什么来避免
并发环境中的问题。如果 Javadoc 中没有信息,则可以
检查 implementation 以查看是否有任何状态。如果读取器不是线程安全的,
您可以使用提供的SynchronizedItemStreamReader
或者在您自己的
同步委托人。您可以将调用同步到read()
,并且只要
处理和写入是 chunk 中最昂贵的部分,您的步骤可能仍会
完成速度比在单线程配置中快得多。
并行步骤
只要需要并行化的应用程序逻辑可以拆分为不同的 职责并分配给各个步骤,则可以在 单个进程。并行步骤执行易于配置和使用。
-
Java
-
XML
使用 Java 配置时,执行步骤(step1,step2)
与step3
很简单,如下所示:
@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(splitFlow())
.next(step4())
.build() //builds FlowJobBuilder instance
.build(); //builds Job instance
}
@Bean
public Flow splitFlow() {
return new FlowBuilder<SimpleFlow>("splitFlow")
.split(taskExecutor())
.add(flow1(), flow2())
.build();
}
@Bean
public Flow flow1() {
return new FlowBuilder<SimpleFlow>("flow1")
.start(step1())
.next(step2())
.build();
}
@Bean
public Flow flow2() {
return new FlowBuilder<SimpleFlow>("flow2")
.start(step3())
.build();
}
@Bean
public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor("spring_batch");
}
例如,执行步骤(step1,step2)
与step3
很简单,
如下:
<job id="job1">
<split id="split1" task-executor="taskExecutor" next="step4">
<flow>
<step id="step1" parent="s1" next="step2"/>
<step id="step2" parent="s2"/>
</flow>
<flow>
<step id="step3" parent="s3"/>
</flow>
</split>
<step id="step4" parent="s4"/>
</job>
<beans:bean id="taskExecutor" class="org.spr...SimpleAsyncTaskExecutor"/>
可配置的任务执行程序用于指定TaskExecutor
implementation 应执行各个流。默认值为SyncTaskExecutor
,但是异步的TaskExecutor
运行中的步骤
平行。请注意,该作业可确保拆分中的每个流在
聚合退出状态并进行转换。
请参阅 拆分流 以了解更多详细信息。
远程分块
在远程分块中,Step
处理被拆分到多个进程中,
通过一些中间件相互通信。下图显示了
模式:

管理器组件是单个进程,而 worker 是多个远程进程。 如果管理器不是瓶颈,则此模式效果最佳,因此处理必须更多 比读取项目更昂贵(在实践中经常出现这种情况)。
管理器是 Spring Batch 的实现Step
使用ItemWriter
取代
通过知道如何将项目块发送到中间件的通用版本作为
消息。worker 是正在使用的任何中间件的标准侦听器(对于
例如,使用 JMS 时,它们将是MesssageListener
implementations),它们的作用是
使用标准的ItemWriter
或ItemProcessor
加上一个ItemWriter
,通过ChunkProcessor
接口。使用它的好处之一
模式是 reader、processor 和 writer 组件是现成的(相同的
将用于 Step 的本地执行)。项目是动态划分的,
并且工作是通过中间件共享的,因此,如果侦听器都渴望
消费者,负载平衡是自动的。
中间件必须是持久的,有保证的交付,并且每个中间件都有一个使用者 消息。JMS 是显而易见的候选者,但其他选项(例如 JavaSpaces)存在于 网格计算和共享内存产品空间。
有关更多详细信息,请参见 Spring Batch 集成 - 远程分块 部分。
分区
Spring Batch 还提供了一个 SPI 用于对Step
执行并执行它
远程。在这种情况下,远程参与者是Step
实例,这些实例可以作为
易于配置并用于本地处理。下图显示了
模式:

这Job
作为Step
实例,以及Step
instances 被标记为 Manager。这张图片中的工人都是相同的
的实例Step
,它实际上可以代替 manager,从而导致
相同的结果Job
.worker 通常是远程服务,但
也可以是本地执行线程。Manager 发送给 worker 的消息
在这种模式中,不需要持久或有保证的交付。Spring Batch
元数据中的JobRepository
确保每个 worker 执行一次,并且只执行一次
每Job
执行。
Spring Batch 中的 SPI 由Step
(称为PartitionStep
) 和两个策略接口,这些接口需要为特定的
环境。策略界面包括PartitionHandler
和StepExecutionSplitter
,
下面的序列图显示了它们的作用:

这Step
在本例中,右侧是 “remote” worker,因此,可能有
许多对象和 OR 进程扮演这个角色,并且PartitionStep
显示驾驶
执行。
-
Java
-
XML
以下示例显示了PartitionStep
使用 Java 时的配置
配置:
@Bean
public Step step1Manager(JobRepository jobRepository) {
return new StepBuilder("step1.manager", jobRepository)
.<String, String>partitioner("step1", partitioner())
.step(step1())
.gridSize(10)
.taskExecutor(taskExecutor())
.build();
}
与多线程步骤的throttleLimit
方法、gridSize
方法防止 Task Executor 被来自单个
步。
以下示例显示了PartitionStep
使用 XML 时的配置
配置:
<step id="step1.manager">
<partition step="step1" partitioner="partitioner">
<handler grid-size="10" task-executor="taskExecutor"/>
</partition>
</step>
与多线程步骤的throttle-limit
属性、grid-size
属性可以防止 Task Executor 被来自单个
步。
Spring 的单元测试套件
Batch Samples (请参阅partition*Job.xml
configuration) 有一个简单的示例,您可以复制和扩展。
Spring Batch 为名为step1:partition0
等等
上。许多人更喜欢将 manager step 称为step1:manager
保持一致性。您可以
为步骤使用别名(通过指定name
属性而不是id
属性)。
PartitionHandler (分区处理程序)
PartitionHandler
是了解远程处理结构的组件,或者
grid 环境。它能够发送StepExecution
请求到远程Step
实例,以某种特定于 Fabric 的格式包装,例如 DTO。它不必知道
如何拆分输入数据或如何聚合多个Step
执行。
一般来说,它可能也不需要了解弹性或故障转移。
因为在许多情况下,这些是 Fabric 的特征。无论如何,Spring Batch 始终
提供独立于结构的可重启性。A 失败Job
总是可以重新启动的,
在这种情况下,只有失败的Steps
重新执行。
这PartitionHandler
interface 可以具有各种
结构类型,包括简单的 RMI 远程处理、EJB 远程处理、自定义 Web 服务、JMS、Java
空间、共享内存网格(例如 Terracotta 或 Coherence)和网格执行结构
(例如 GridGain)。Spring Batch 不包含任何专有网格的实现
或远程处理结构。
但是,Spring Batch 确实提供了PartitionHandler
那
执行Step
实例,使用TaskExecutor
战略从Spring。该实现称为TaskExecutorPartitionHandler
.
-
Java
-
XML
您可以显式配置TaskExecutorPartitionHandler
使用 Java 配置,
如下:
@Bean
public Step step1Manager(JobRepository jobRepository) {
return new StepBuilder("step1.manager", jobRepository)
.partitioner("step1", partitioner())
.partitionHandler(partitionHandler())
.build();
}
@Bean
public PartitionHandler partitionHandler() {
TaskExecutorPartitionHandler retVal = new TaskExecutorPartitionHandler();
retVal.setTaskExecutor(taskExecutor());
retVal.setStep(step1());
retVal.setGridSize(10);
return retVal;
}
这TaskExecutorPartitionHandler
是使用 XML 配置的步骤
命名空间。您还可以显式配置它,如下所示:
<step id="step1.manager">
<partition step="step1" handler="handler"/>
</step>
<bean class="org.spr...TaskExecutorPartitionHandler">
<property name="taskExecutor" ref="taskExecutor"/>
<property name="step" ref="step1" />
<property name="gridSize" value="10" />
</bean>
这gridSize
attribute 确定要创建的单独步骤执行的数量,因此
它可以与TaskExecutor
.或者,它
可以设置为大于可用线程数,这使得
工作更小。
这TaskExecutorPartitionHandler
适用于 IO 密集型Step
实例,例如
将大量文件拷贝或将文件系统复制到 Content Management 中
系统。它还可以通过提供Step
实现
这是远程调用的代理(例如使用 Spring Remoting)。
分区程序
这Partitioner
具有更简单的职责:生成执行上下文作为 input
参数仅用于新步骤执行(无需担心重启)。它有一个
single 方法,如下面的接口定义所示:
public interface Partitioner {
Map<String, ExecutionContext> partition(int gridSize);
}
此方法的返回值为每个步骤执行关联一个唯一的名称(String
),其中输入参数为ExecutionContext
.名称显示
later 作为分区StepExecutions
.这ExecutionContext
只是一个名称-值对的袋子,因此它可能包含
主键、行号或输入文件的位置。遥控器Step
然后
通常使用#{…}
placeholders (步骤中的后期绑定
范围),如下一节所示。
步骤执行的名称(Map
返回者Partitioner
) 需要
在Job
但没有任何其他特定的
要求。执行此作(并使名称对用户有意义)的最简单方法是
使用 prefix+suffix 命名约定,其中 prefix 是
正在执行(这本身在Job
),后缀只是一个
计数器。有一个SimplePartitioner
在使用此约定的框架中。
您可以使用名为PartitionNameProvider
提供分区
name 与 partition 本身分开。如果Partitioner
实现此
接口,则在重新启动时仅查询名称。如果分区成本高昂,
这可能是一个有用的优化。由PartitionNameProvider
必须
匹配Partitioner
.
将输入数据绑定到步骤
它对于PartitionHandler
要有
相同的配置,并且它们的输入参数在运行时从ExecutionContext
.使用 Spring Batch 的 StepScope 功能很容易做到这一点
(在 Late Binding 一节中有更详细的介绍)。为
example,如果Partitioner
创建ExecutionContext
具有属性键的实例
叫fileName
,为每个步骤调用指向不同的文件(或目录),
这Partitioner
output 可能类似于下表的内容:
Step Execution Name (key) (步骤执行名称 (key)) |
ExecutionContext (值) |
文件复制:分区 0 |
文件名=/home/data/one |
文件复制:partition1 |
文件名=/home/data/two |
文件复制:partition2 |
文件名=/home/data/three |
然后,可以通过使用到执行上下文的后期绑定将文件名绑定到步骤。
-
Java
-
XML
以下示例演示如何在 Java 中定义后期绑定:
@Bean
public MultiResourceItemReader itemReader(
@Value("#{stepExecutionContext['fileName']}/*") Resource [] resources) {
return new MultiResourceItemReaderBuilder<String>()
.delegate(fileReader())
.name("itemReader")
.resources(resources)
.build();
}
下面的示例演示如何在 XML 中定义后期绑定:
<bean id="itemReader" scope="step"
class="org.spr...MultiResourceItemReader">
<property name="resources" value="#{stepExecutionContext[fileName]}/*"/>
</bean>