扩展和并行处理
许多批处理问题可以通过单线程、单进程作业来解决。 因此,在考虑之前正确检查这是否满足您的需求总是一个好主意 关于更复杂的实现。衡量实际作业的性能,并查看 最简单的实现首先满足您的需求。您可以读取和写入 几百兆字节的存储时间远远超过一分钟,即使使用标准硬件也是如此。
当您准备好开始通过一些并行处理实现作业时, Spring Batch 提供了一系列选项,本章将介绍这些选项,尽管一些 功能详见其他专题。在高层次上,有两种并行模式 加工:
-
单进程、多线程
-
多进程
这些也分为以下几类:
-
多线程步骤 (单进程)
-
并行步骤(单进程)
-
步骤 Remote Chunking of Step (multi-process)
-
对步骤进行分区(单进程或多进程)
首先,我们回顾一下单进程选项。然后,我们回顾一下多进程选项。
多线程步骤
开始并行处理的最简单方法是将 a 添加到您的 Step
配置。TaskExecutor
-
Java
-
XML
使用 Java 配置时,您可以在步骤中添加 a
如下例所示:TaskExecutor
@Bean
public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor("spring_batch");
}
@Bean
public Step sampleStep(TaskExecutor taskExecutor, JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("sampleStep", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.taskExecutor(taskExecutor)
.build();
}
例如,您可以添加属性 TO ,如下所示:tasklet
<step id="loading">
<tasklet task-executor="taskExecutor">...</tasklet>
</step>
在此示例中,this 是对另一个 bean 定义的引用,该
实现接口。TaskExecutor
是一个标准的 Spring 接口,因此请查阅 Spring 用户指南以了解可用的详细信息
实现。最简单的多线程是 .taskExecutor
TaskExecutor
TaskExecutor
SimpleAsyncTaskExecutor
上述配置的结果是,通过读取、处理、
并在单独的执行线程中写入每个项目块(每个提交间隔)。
请注意,这意味着要处理的项目没有固定的顺序,并且 chunk
可能包含与单线程大小写相比不连续的项目。在
除了任务执行程序设置的任何限制(例如,它是否由
thread pool),则 tasklet 配置具有限制(默认值:4)。
您可能需要增加此限制以确保线程池得到充分利用。Step
-
Java
-
XML
使用 Java 配置时,构建器提供对限制的访问,如 遵循:
@Bean
public Step sampleStep(TaskExecutor taskExecutor, JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("sampleStep", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(itemReader())
.writer(itemWriter())
.taskExecutor(taskExecutor)
.throttleLimit(20)
.build();
}
例如,您可以增加 throttle-limit,如下所示:
<step id="loading"> <tasklet
task-executor="taskExecutor"
throttle-limit="20">...</tasklet>
</step>
另请注意,在
您的步骤,例如 .请确保至少在这些资源中创建池
与步骤中所需的并发线程数一样大。DataSource
限制限制弃用
从 v5.0 开始,限制已弃用,没有替代品。如果要将
当前限流机制中,需要提供
自定义实现(基于具有 Bounded Task 队列的 A)
并使用 : Java 配置
|
使用多线程实现存在一些实际限制
一些常见的 Batch 使用案例。a 中的许多参与者(如读取器和写入者)
是有状态的。如果状态没有按线程隔离,则这些组件不会
在多线程 .特别是,大多数读者和
Spring Batch 中的写入器不是为多线程使用而设计的。然而,
可以使用无状态或线程安全的读取器和写入器,并且有一个示例
(称为 ) 在春季
显示使用进程指示器进行跟踪的 Batch Samples(请参阅防止状态持久性)
数据库输入表中已处理的项目数。Step
Step
Step
parallelJob
Spring Batch 提供了 和 的一些实现。通常
他们在 Javadoc 中说明了它们是否是线程安全的,或者您必须做些什么来避免
并发环境中的问题。如果 Javadoc 中没有信息,则可以
检查 implementation 以查看是否有任何状态。如果读取器不是线程安全的,
您可以使用提供的装饰它或自行使用
同步委托人。您可以将调用同步到 ,并且只要
处理和写入是 chunk 中最昂贵的部分,您的步骤可能仍会
完成速度比在单线程配置中快得多。ItemWriter
ItemReader
SynchronizedItemStreamReader
read()
并行步骤
只要需要并行化的应用程序逻辑可以拆分为不同的 职责并分配给各个步骤,则可以在 单个进程。并行步骤执行易于配置和使用。
-
Java
-
XML
使用 Java 配置时,并行执行步骤非常简单,如下所示:(step1,step2)
step3
@Bean
public Job job(JobRepository jobRepository) {
return new JobBuilder("job", jobRepository)
.start(splitFlow())
.next(step4())
.build() //builds FlowJobBuilder instance
.build(); //builds Job instance
}
@Bean
public Flow splitFlow() {
return new FlowBuilder<SimpleFlow>("splitFlow")
.split(taskExecutor())
.add(flow1(), flow2())
.build();
}
@Bean
public Flow flow1() {
return new FlowBuilder<SimpleFlow>("flow1")
.start(step1())
.next(step2())
.build();
}
@Bean
public Flow flow2() {
return new FlowBuilder<SimpleFlow>("flow2")
.start(step3())
.build();
}
@Bean
public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor("spring_batch");
}
例如,并行执行步骤非常简单。
如下:(step1,step2)
step3
<job id="job1">
<split id="split1" task-executor="taskExecutor" next="step4">
<flow>
<step id="step1" parent="s1" next="step2"/>
<step id="step2" parent="s2"/>
</flow>
<flow>
<step id="step3" parent="s3"/>
</flow>
</split>
<step id="step4" parent="s4"/>
</job>
<beans:bean id="taskExecutor" class="org.spr...SimpleAsyncTaskExecutor"/>
可配置的任务执行程序用于指定哪个实施应执行各个流。默认值为 ,但需要异步才能运行
平行。请注意,该作业可确保拆分中的每个流在
聚合退出状态并进行转换。TaskExecutor
SyncTaskExecutor
TaskExecutor
请参阅 拆分流 以了解更多详细信息。
远程分块
在远程分块中,处理被拆分到多个进程中。
通过一些中间件相互通信。下图显示了
模式:Step
管理器组件是单个进程,而 worker 是多个远程进程。 如果管理器不是瓶颈,则此模式效果最佳,因此处理必须更多 比读取项目更昂贵(在实践中经常出现这种情况)。
管理器是 Spring Batch 的实现,其中替换了
通过知道如何将项目块发送到中间件的通用版本作为
消息。worker 是正在使用的任何中间件的标准侦听器(对于
例如,对于 JMS,它们将是实现),它们的角色是
通过界面使用 standard 或 plus an 来处理项目块。使用它的好处之一
模式是 reader、processor 和 writer 组件是现成的(相同的
将用于 Step 的本地执行)。项目是动态划分的,
并且工作是通过中间件共享的,因此,如果侦听器都渴望
消费者,负载平衡是自动的。Step
ItemWriter
MesssageListener
ItemWriter
ItemProcessor
ItemWriter
ChunkProcessor
中间件必须是持久的,有保证的交付,并且每个中间件都有一个使用者 消息。JMS 是显而易见的候选者,但其他选项(例如 JavaSpaces)存在于 网格计算和共享内存产品空间。
有关更多详细信息,请参见 Spring Batch 集成 - 远程分块 部分。
分区
Spring Batch 还提供了一个 SPI 用于对执行进行分区并执行它
远程。在这种情况下,远程参与者是可以
易于配置并用于本地处理。下图显示了
模式:Step
Step
在左侧作为实例序列运行,其中一个实例被标记为 manager。这张图片中的工人都是相同的
实例,它实际上可以代替管理器,从而导致
的结果相同。worker 通常是远程服务,但
也可以是本地执行线程。Manager 发送给 worker 的消息
在这种模式中,不需要持久或有保证的交付。Spring Batch
metadata 确保每个 worker 执行一次,并且只执行一次
每次执行。Job
Step
Step
Step
Job
JobRepository
Job
Spring Batch 中的 SPI 由一个特殊的实现(称为)和两个策略接口组成,这些接口需要为特定的
环境。策略接口是 和 ,
下面的序列图显示了它们的作用:Step
PartitionStep
PartitionHandler
StepExecutionSplitter
在这种情况下,右侧是 “remote” worker,因此,可能有
许多对象和/或进程扮演这个角色,并且 显示为驱动
执行。Step
PartitionStep
-
Java
-
XML
以下示例显示了使用 Java 时的配置
配置:PartitionStep
@Bean
public Step step1Manager(JobRepository jobRepository) {
return new StepBuilder("step1.manager", jobRepository)
.<String, String>partitioner("step1", partitioner())
.step(step1())
.gridSize(10)
.taskExecutor(taskExecutor())
.build();
}
与多线程步骤的方法类似,该方法可以防止任务执行程序被来自单个
步。throttleLimit
gridSize
以下示例显示了使用 XML 时的配置
配置:PartitionStep
<step id="step1.manager">
<partition step="step1" partitioner="partitioner">
<handler grid-size="10" task-executor="taskExecutor"/>
</partition>
</step>
与多线程步骤的属性类似,该属性可以防止任务执行程序被来自单个
步。throttle-limit
grid-size
Spring 的单元测试套件
Batch Samples (请参阅配置) 有一个简单的示例,您可以复制和扩展该示例。partition*Job.xml
Spring Batch 为名为 等的分区创建步骤执行
上。许多人更喜欢将 Manager Step 称为一致性。您可以
为步骤使用别名(通过指定 attribute 而不是 attribute)。step1:partition0
step1:manager
name
id
PartitionHandler (分区处理程序)
PartitionHandler
是了解远程处理结构的组件,或者
grid 环境。它能够向远程实例发送请求,并以某种特定于结构的格式(如 DTO)包装。它不必知道
如何拆分输入数据或如何聚合多次执行的结果。
一般来说,它可能也不需要了解弹性或故障转移。
因为在许多情况下,这些是 Fabric 的特征。无论如何,Spring Batch 始终
提供独立于结构的可重启性。失败的总是可以重新启动的,
在这种情况下,只有失败的 Cookie 才会重新执行。StepExecution
Step
Step
Job
Steps
该接口可以具有各种
结构类型,包括简单的 RMI 远程处理、EJB 远程处理、自定义 Web 服务、JMS、Java
空间、共享内存网格(例如 Terracotta 或 Coherence)和网格执行结构
(例如 GridGain)。Spring Batch 不包含任何专有网格的实现
或远程处理结构。PartitionHandler
但是,Spring Batch 确实提供了该
使用 Spring 中的策略在单独的执行线程中本地执行实例。该实现称为 。PartitionHandler
Step
TaskExecutor
TaskExecutorPartitionHandler
-
Java
-
XML
您可以使用 Java 配置显式配置
如下:TaskExecutorPartitionHandler
@Bean
public Step step1Manager(JobRepository jobRepository) {
return new StepBuilder("step1.manager", jobRepository)
.partitioner("step1", partitioner())
.partitionHandler(partitionHandler())
.build();
}
@Bean
public PartitionHandler partitionHandler() {
TaskExecutorPartitionHandler retVal = new TaskExecutorPartitionHandler();
retVal.setTaskExecutor(taskExecutor());
retVal.setStep(step1());
retVal.setGridSize(10);
return retVal;
}
这是使用 XML 配置的步骤
命名空间。您还可以显式配置它,如下所示:TaskExecutorPartitionHandler
<step id="step1.manager">
<partition step="step1" handler="handler"/>
</step>
<bean class="org.spr...TaskExecutorPartitionHandler">
<property name="taskExecutor" ref="taskExecutor"/>
<property name="step" ref="step1" />
<property name="gridSize" value="10" />
</bean>
该属性确定要创建的单独步骤执行的数量,因此
它可以与 .或者,它
可以设置为大于可用线程数,这使得
工作更小。gridSize
TaskExecutor
这对于 IO 密集型实例非常有用,例如
将大量文件拷贝或将文件系统复制到 Content Management 中
系统。它还可以通过提供 implementation 来用于远程执行
这是远程调用的代理(例如使用 Spring Remoting)。TaskExecutorPartitionHandler
Step
Step
分区程序
它有一个更简单的职责:生成执行上下文作为 input
参数仅用于新步骤执行(无需担心重启)。它有一个
single 方法,如下面的接口定义所示:Partitioner
public interface Partitioner {
Map<String, ExecutionContext> partition(int gridSize);
}
此方法的返回值将每个步骤执行 (the ) 的唯一名称与 .名称显示
稍后在 Batch 元数据中作为分区 .这只是一个名称-值对的袋子,因此它可能包含一系列
主键、行号或输入文件的位置。远程 然后
通常使用占位符绑定到上下文输入(步骤 中的后期绑定
范围),如下一节所示。String
ExecutionContext
StepExecutions
ExecutionContext
Step
#{…}
步骤执行的名称(返回者 中的键 )需要
在 a 的步骤执行中是唯一的,但没有任何其他特定的
要求。执行此操作(并使名称对用户有意义)的最简单方法是
使用 prefix+suffix 命名约定,其中 prefix 是
正在执行(它本身在 中是唯一的),并且后缀只是一个
计数器。框架中有一个使用此约定。Map
Partitioner
Job
Job
SimplePartitioner
您可以使用一个可选的接口来提供分区
name 与 partition 本身分开。如果 a 实现了这个
接口,则在重新启动时仅查询名称。如果分区成本高昂,
这可能是一个有用的优化。必须提供的名称
匹配 .PartitionNameProvider
Partitioner
PartitionNameProvider
Partitioner
将输入数据绑定到步骤
执行的步骤具有
相同的配置,并且它们的输入参数在运行时从 .使用 Spring Batch 的 StepScope 功能很容易做到这一点
(在 Late Binding 一节中有更详细的介绍)。为
例如,如果 create 实例具有属性键
called ,为每个步骤调用指向不同的文件(或目录),
输出可能类似于下表的内容:PartitionHandler
ExecutionContext
Partitioner
ExecutionContext
fileName
Partitioner
Step Execution Name (key) (步骤执行名称 (key)) |
ExecutionContext (值) |
文件复制:分区 0 |
文件名=/home/data/one |
文件复制:partition1 |
文件名=/home/data/two |
文件复制:partition2 |
文件名=/home/data/three |
然后,可以通过使用到执行上下文的后期绑定将文件名绑定到步骤。
-
Java
-
XML
以下示例演示如何在 Java 中定义后期绑定:
@Bean
public MultiResourceItemReader itemReader(
@Value("#{stepExecutionContext['fileName']}/*") Resource [] resources) {
return new MultiResourceItemReaderBuilder<String>()
.delegate(fileReader())
.name("itemReader")
.resources(resources)
.build();
}
下面的示例演示如何在 XML 中定义后期绑定:
<bean id="itemReader" scope="step"
class="org.spr...MultiResourceItemReader">
<property name="resources" value="#{stepExecutionContext[fileName]}/*"/>
</bean>