Batch 的域语言
对于任何有经验的批处理架构师来说,批处理的总体概念
Spring Batch 应该是熟悉和舒适的。有 “Jobs” 和 “Steps” 以及
开发人员提供的名为 和 的处理单元。然而
由于 Spring 模式、操作、模板、回调和惯用语,因此有
以下机会:ItemReader
ItemWriter
-
对明确关注点分离的依从性显著提高。
-
清晰描述的架构层和作为接口提供的服务。
-
简单的默认实现,允许快速采用和易于使用 开箱即用。
-
显著增强了可扩展性。
下图是批处理参考体系结构的简化版本,该体系结构 已经使用了几十年。它概述了构成 batch processing 的 domain language 进行批处理。此体系结构框架是一个蓝图,它具有以下 已通过最近几代的数十年实施得到验证 平台(大型机上的 COBOL、Unix 上的 C,现在的 Java 任何地方)。JCL 和 COBOL 开发人员 可能和 C、C# 和 Java 开发人员一样熟悉这些概念。Spring Batch 提供了 layers、components 和 technical 的物理实现 服务常见于健壮、可维护的系统,用于解决 使用基础设施和扩展创建从简单到复杂的批处理应用程序 以满足非常复杂的加工需求。
上图突出显示了构成
Spring Batch 中。A 有一对多步骤,每个步骤都恰好有一个 ,
一个 ,和一个 。需要启动作业 (使用 ),并且需要存储有关当前正在运行的进程的元数据 (在 中)。Job
ItemReader
ItemProcessor
ItemWriter
JobLauncher
JobRepository
工作
本节介绍与批处理作业概念相关的构造型。A 是一个
封装整个批处理过程的实体。与其他 Spring 一样
projects 中,与 XML 配置文件或基于 Java 的
配置。此配置可称为 “job configuration”。但是,只是整个层次结构的顶部,如下图所示:Job
Job
Job
在 Spring Batch 中,a 只是实例的容器。它结合了多个
在逻辑上属于流中并允许配置属性的步骤
global 到所有步骤,例如可重启性。作业配置包含:Job
Step
-
作业的名称。
-
实例的定义和排序。
Step
-
作业是否可重新启动。
-
Java
-
XML
对于使用 Java 配置的用户,Spring Batch 提供了
类形式的接口,它创建了一些标准
功能。当使用基于 Java 的配置时,一组
builders 可用于实例化 ,如下所示
示例显示:Job
SimpleJob
Job
Job
@Bean
public Job footballJob(JobRepository jobRepository) {
return new JobBuilder("footballJob", jobRepository)
.start(playerLoad())
.next(gameLoad())
.next(playerSummarization())
.build();
}
对于那些使用 XML 配置的用户, Spring Batch 以类的形式提供了接口的默认实现,它创建了一些标准
功能。但是,批处理命名空间抽象化了
直接实例化它。相反,您可以使用元素作为
以下示例显示:Job
SimpleJob
Job
<job>
<job id="footballJob">
<step id="playerload" next="gameLoad"/>
<step id="gameLoad" next="playerSummarization"/>
<step id="playerSummarization"/>
</job>
JobInstance 实例
A 是指逻辑作业运行的概念。考虑一个批处理作业,该
应该在一天结束时运行一次,例如前面的
图。有一个作业,但 的每个单独运行都必须是
单独跟踪。对于此作业,每天有一个 logical。
例如,有 1 月 1 日运行、1 月 2 日运行,依此类推。如果 1 月 1 日
run 第一次失败,第二天再次运行,它仍然是 1 月 1 日的运行。
(通常,这也与它正在处理的数据相对应,即 1 月
第 1 次运行处理 1 月 1 日的数据)。因此,每个 API 都可以有多个
执行(本章稍后将更详细地讨论),并且仅
一个(对应于特定和标识)可以
在给定时间运行。JobInstance
EndOfDay
Job
EndOfDay
Job
JobInstance
JobInstance
JobExecution
JobInstance
Job
JobParameters
a 的定义与要加载的数据完全无关。
如何加载数据完全取决于实现。为
例如,在该方案中,数据上可能有一列指示数据所属的 OR。所以,1 月 1 日运行
将仅加载第 1 次运行中的数据,而 1 月 2 日运行将仅使用
第二。因为这个决定可能是一个商业决策,所以它留给 自己决定。但是,使用相同的 API 可以确定
“状态”(即 ,本章稍后将讨论)
使用以前的执行。使用 new 意味着“从
beginning“,而使用现有实例通常意味着”从您离开的地方开始”
关闭”。JobInstance
ItemReader
EndOfDay
effective date
schedule date
ItemReader
JobInstance
ExecutionContext
JobInstance
JobParameters (作业参数)
在讨论了它的不同之处之后,自然而然地要问的问题
是:“一个人如何与另一个人区分开来?答案是: .对象包含一组用于启动批处理的参数
工作。它们可用于识别,甚至在运行期间用作参考数据,因为
下图显示:JobInstance
Job
JobInstance
JobParameters
JobParameters
在前面的示例中,有两个实例,一个用于 1 月 1 日,另一个用于
对于 January 2nd,实际上只有一个 ,但它有两个对象:
一个以 Job 参数 01-01-2017 启动,另一个以
参数为 01-02-2017。因此,合约可以定义为: = + 标识 .这允许开发人员有效地控制 a 的定义方式,因为他们控制传入的参数。Job
JobParameter
JobInstance
Job
JobParameters
JobInstance
并非所有作业参数都需要用于识别 .默认情况下,它们会这样做。但是,框架也允许提交
的 a 的参数对 .JobInstance Job JobInstance |
JobExecution (任务执行)
A 是指单次尝试运行 Job 的技术概念。一
执行可能以 failure 或 success 结束,但对应于给定的
除非执行成功完成,否则不会认为执行已完成。
以前面描述的示例为例,考虑一个 for
2017 年 1 月 1 日,第一次运行时失败。如果使用相同的
将作业参数标识为首次运行 (01-01-2017),则新的 IS
创建。但是,仍然只有一个 .JobExecution
JobInstance
EndOfDay
Job
JobInstance
JobExecution
JobInstance
a 定义什么是作业以及如何执行作业,a 是
Purely Organizational 对象将执行分组在一起,主要是为了启用正确的
restart semantics 的 Semantics 中。但是,A 是
实际发生在运行期间,并且包含更多必须控制的属性
和 persisted,如下表所示:Job
JobInstance
JobExecution
财产 |
定义 |
|
指示执行状态的对象。运行时,它是 。如果失败,则为 。如果完成
成功了,它是 |
|
A 表示开始执行时的当前系统时间。
如果作业尚未启动,则此字段为空。 |
|
A 表示执行完成时的当前系统时间,
无论它是否成功。如果作业尚未
完成。 |
|
的 ,指示运行的结果。这是最重要的,因为它
包含返回给调用方的退出代码。有关更多详细信息,请参阅第 5 章。这
如果作业尚未完成,则字段为空。 |
|
A 表示当前系统时间
首先持续存在。作业可能尚未启动(因此没有开始时间),但
它始终具有 ,这是用于管理作业级 的框架所必需的。 |
|
A 表示上次保留 a 的时间。此字段
如果作业尚未启动,则为空。 |
|
包含需要在 执行。 |
|
在执行 .这些可能很有用
如果在 失败期间遇到多个异常。 |
这些属性很重要,因为它们是持久的,可以完全用于
确定执行的状态。例如,如果 01-01 的作业为
在晚上 9:00 执行,在 9:30 失败,则在批处理中创建以下条目
元数据表:EndOfDay
JOB_INST_ID |
JOB_NAME |
1 |
结束工作 |
JOB_EXECUTION_ID |
TYPE_CD |
KEY_NAME |
DATE_VAL |
识别 |
1 |
日期 |
附表。日期 |
2017-01-01 |
真 |
JOB_EXEC_ID |
JOB_INST_ID |
START_TIME |
END_TIME |
地位 |
1 |
1 |
2017-01-01 21:00 |
2017-01-01 21:30 |
失败 |
为了清楚起见,可能已缩写或删除列名,并且 格式。 |
现在作业已失败,假设问题花了一整夜才出现
确定,因此 “Batch 窗口” 现在已关闭。进一步假设窗口
从晚上 9:00 开始,作业在 01-01 中再次启动,从中断处开始,然后
在 9:30 成功完成。因为现在是第二天,所以 01-02 作业必须为
运行,之后在 9:31 开始,并在正常时间完成
时间 10:30。没有要求在之后踢出一个
另一个,除非两个 Job 有可能尝试访问相同的数据,
导致数据库级别的锁定问题。这完全取决于调度程序
确定何时应运行 a。由于它们是独立的,因此 Spring
Batch 不会尝试阻止它们并发运行。(尝试运行
same while another already is running 会导致 a 被抛出)。现在应该有一个额外的条目
在 AND 表中以及表中的两个额外条目中,如下表所示:JobInstance
Job
JobInstances
JobInstance
JobExecutionAlreadyRunningException
JobInstance
JobParameters
JobExecution
JOB_INST_ID |
JOB_NAME |
1 |
结束工作 |
2 |
结束工作 |
JOB_EXECUTION_ID |
TYPE_CD |
KEY_NAME |
DATE_VAL |
识别 |
1 |
日期 |
附表。日期 |
2017-01-01 00:00:00 |
真 |
2 |
日期 |
附表。日期 |
2017-01-01 00:00:00 |
真 |
3 |
日期 |
附表。日期 |
2017-01-02 00:00:00 |
真 |
JOB_EXEC_ID |
JOB_INST_ID |
START_TIME |
END_TIME |
地位 |
1 |
1 |
2017-01-01 21:00 |
2017-01-01 21:30 |
失败 |
2 |
1 |
2017-01-02 21:00 |
2017-01-02 21:30 |
完成 |
3 |
2 |
2017-01-02 21:31 |
2017-01-02 22:29 |
完成 |
为了清楚起见,可能已缩写或删除列名,并且 格式。 |
步
A 是一个域对象,它封装了批处理的独立顺序阶段
工作。因此,每个步骤都完全由一个或多个步骤组成。A 包含
定义和控制实际批处理所需的所有信息。这
必然是一个模糊的描述,因为任何给定的内容都位于
编写 .A 可以像
开发者的愿望。一个简单的可能会将数据从文件加载到数据库中,
需要很少或不需要代码(取决于使用的实现)。更复杂的应用程序可能具有复杂的业务规则,这些规则作为处理的一部分应用。如
其中 a ,a 具有与唯一 关联的 individual ,如下图所示:Step
Job
Step
Step
Job
Step
Step
Step
Job
Step
StepExecution
JobExecution
StepExecution 执行
A 表示执行 .每次运行 时都会创建一个新的 a,类似于 。但是,如果步骤失败
执行,因为该步骤在失败之前,则不会保留任何执行。只有在实际启动时才会创建 A。StepExecution
Step
StepExecution
Step
JobExecution
StepExecution
Step
Step
执行由 class 的对象表示。每次执行
包含对其相应步骤的引用,并且与 Transaction 相关
数据,例如提交和回滚计数以及开始和结束时间。此外,每个步骤
execution 包含一个 ,其中包含开发人员需要
在批处理运行中保留,例如所需的统计信息或状态信息
重新启动。下表列出了 的属性 :StepExecution
JobExecution
ExecutionContext
StepExecution
财产 |
定义 |
|
指示执行状态的对象。运行时,
status 为 。如果失败,则状态为 。如果它
成功完成,则状态为 。 |
|
A 表示开始执行时的当前系统时间。
如果步骤尚未开始,则此字段为空。 |
|
A 表示执行完成时的当前系统时间,
无论它是否成功。如果步骤尚未
退出。 |
|
指示执行结果的 。这是最重要的,因为
它包含返回给调用方的退出代码。有关更多详细信息,请参阅第 5 章。
如果作业尚未退出,则此字段为空。 |
|
包含需要在 执行。 |
|
已成功读取的项目数。 |
|
已成功写入的项目数。 |
|
已为此执行提交的事务数。 |
|
由 控制的业务事务已滚动的次数
返回。 |
|
失败的次数,导致跳过项目。 |
|
失败的次数,导致跳过项目。 |
|
已被 “筛选”的项目数。 |
|
失败的次数,导致跳过项目。 |
ExecutionContext
An 表示持久化的键/值对的集合,并且
由框架控制,为开发人员提供存储持久化
state 的 state 的 API 值。(对于那些
熟悉的 Quartz,它与 非常相似。最好的使用示例是
便于重启。以平面文件输入为例,同时处理单个
行,框架会定期在 Commit Points.行为
so 允许存储其状态,以防在运行期间发生致命错误
或者即使停电。所需要做的就是输入当前的行数
读入上下文,如下例所示,框架会执行
休息:ExecutionContext
StepExecution
JobExecution
JobDataMap
ExecutionContext
ItemReader
executionContext.putLong(getKey(LINES_READ_COUNT), reader.getPosition());
以 stereotypes 部分中的示例为例,假设有
是一个步骤,用于将文件加载到数据库中。在第一次运行失败后,
元数据表将类似于以下示例:EndOfDay
Job
loadData
JOB_INST_ID |
JOB_NAME |
1 |
结束工作 |
JOB_INST_ID |
TYPE_CD |
KEY_NAME |
DATE_VAL |
1 |
日期 |
附表。日期 |
2017-01-01 |
JOB_EXEC_ID |
JOB_INST_ID |
START_TIME |
END_TIME |
地位 |
1 |
1 |
2017-01-01 21:00 |
2017-01-01 21:30 |
失败 |
STEP_EXEC_ID |
JOB_EXEC_ID |
STEP_NAME |
START_TIME |
END_TIME |
地位 |
1 |
1 |
loadData (加载数据) |
2017-01-01 21:00 |
2017-01-01 21:30 |
失败 |
STEP_EXEC_ID |
SHORT_CONTEXT |
1 |
{件数=40321} |
在前面的例子中,运行了 30 分钟并处理了 40,321 个“片段”,其中
将表示文件中的行。此值在每次
commit 的 .在提交之前收到通知需要各种 implementations (或 an )之一,这些 implementations (或 an )将更详细地讨论
在本指南的后面部分。与前面的示例一样,假定 is
第二天重新启动。重新启动时,来自 的
最后一次运行将从数据库中重构。当 被打开时,它可以
检查它在上下文中是否有任何 stored 状态,并从那里初始化自身,
如下例所示:Step
ExecutionContext
StepListener
ItemStream
Job
ExecutionContext
ItemReader
if (executionContext.containsKey(getKey(LINES_READ_COUNT))) {
log.debug("Initializing for restart. Restart data is: " + executionContext);
long lineCount = executionContext.getLong(getKey(LINES_READ_COUNT));
LineReader reader = getReader();
Object record = "";
while (reader.getPosition() < lineCount && record != null) {
record = readLine();
}
}
在这种情况下,在前面的代码运行后,当前行是 40,322,让 Row 从 50022 停止的位置重新开始。您还可以使用 for
需要保留的有关运行本身的统计信息。例如,如果平面文件
包含跨多行存在的处理订单,则可能需要
store 已处理的订单数量(这与
行读取),以便可以在末尾发送一封包含总数
正文中处理的订单数。框架为开发人员处理存储 this 操作,
以使用 individual .这可能非常困难
知道是否应该使用 existing例如,使用上面的示例,当 01-01 运行第二次再次启动时,
框架认识到它是相同的,并且基于个体,
将 the 从数据库中拉出,并将其 (作为 的一部分 ) 交给 itself.相反,对于 01-02 运行,框架
识别出它是一个不同的实例,因此必须将空上下文交给 .框架为
developer,以确保在正确的时间向他们提供状态。这也很重要
以注意在任何给定时间都只存在一个 per。
的客户端应该小心,因为这会创建一个共享的
键空间。因此,在输入值时应小心,以确保没有数据
覆盖。但是,在上下文中绝对不存储任何数据,因此没有
对框架产生不利影响的方式。Step
ExecutionContext
Step
JobInstance
ExecutionContext
EndOfDay
JobInstance
Step
ExecutionContext
StepExecution
Step
Step
ExecutionContext
StepExecution
ExecutionContext
Step
请注意,每个 中至少有一个 1 个,每个 中至少有一个 。例如,请考虑以下
代码片段:ExecutionContext
JobExecution
StepExecution
ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();
//ecStep does not equal ecJob
如注释中所述,不等于 。他们是两个不同的。范围限定为 的 URL 将保存在 中的每个提交点,而范围限定为 Job 的 URL 将在每次执行之间保存。ecStep
ecJob
ExecutionContexts
Step
Step
Step
在 中,所有非瞬态条目都必须为 。
执行上下文的正确序列化为步骤和作业的重启功能奠定了基础。
如果您使用的键或值本身不可序列化,则需要
采用定制的序列化方法。无法序列化执行上下文
可能会危及状态持久化过程,使失败的作业无法正确恢复。ExecutionContext Serializable |
JobRepository 仓库
JobRepository
是前面提到的所有 stereotype 的持久化机制。
它为 、 和 implementations提供 CRUD 操作。首次启动时,将从存储库获取 a。此外,在
执行过程和实现是持久的
通过将它们传递到存储库。JobLauncher
Job
Step
Job
JobExecution
StepExecution
JobExecution
-
Java
-
XML
使用 Java 配置时,注释提供 as 自动配置的组件之一。@EnableBatchProcessing
JobRepository
Spring Batch XML 名称空间为配置实例提供支持
替换为标签,如下例所示:JobRepository
<job-repository>
<job-repository id="jobRepository"/>
作业Starters
JobLauncher
表示用于启动具有给定集的 的简单接口,如下例所示:Job
JobParameters
public interface JobLauncher {
public JobExecution run(Job job, JobParameters jobParameters)
throws JobExecutionAlreadyRunningException, JobRestartException,
JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}
预计实现会从 获取 valid 并执行 .JobExecution
JobRepository
Job
ItemReader
ItemReader
是一个抽象,表示 , 1 的输入检索
item 的 intent 值。当 用尽了它所能提供的物品时,它
通过返回 .您可以找到有关该界面及其
Readers And Writers 中的各种实现。Step
ItemReader
null
ItemReader
ItemWriter
ItemWriter
是一个抽象,表示 、 一个 batch 或 chunk 的输出
的项目数。通常,an 不知道它应该输入的内容
receive next,并且只知道在其当前调用中传递的 Item。您可以找到更多
有关 Readers And Writers 中接口及其各种实现的详细信息。Step
ItemWriter
ItemWriter
ItemProcessor
ItemProcessor
是表示项的业务处理的抽象。
读取 1 项,写入 1 项,而 则提供转换或应用其他业务处理的访问点。
如果在处理项时确定该项无效,则返回表示不应写出该项。您可以在 Readers And Writers 中找到有关该界面的更多详细信息。ItemReader
ItemWriter
ItemProcessor
null
ItemProcessor
Batch 命名空间
前面列出的许多域概念都需要在 Spring 中进行配置。虽然上面有一些接口的实现,但您可以
在标准 bean 定义中使用,为了便于
配置,如下例所示:ApplicationContext
<beans:beans xmlns="http://www.springframework.org/schema/batch"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/batch
https://www.springframework.org/schema/batch/spring-batch.xsd">
<job id="ioSampleJob">
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
</tasklet>
</step>
</job>
</beans:beans>