数据库
与大多数企业应用程序样式一样,数据库是 批。但是,由于 系统必须使用的数据集。如果 SQL 语句返回 100 万行,则 结果集可能会将所有返回的结果保存在内存中,直到读取完所有行。 Spring Batch 为此问题提供了两种类型的解决方案:
基于游标ItemReader
实现
使用数据库游标通常是大多数批处理开发人员的默认方法。
因为它是数据库对 “流” 关系数据问题的解决方案。这
JavaResultSet
class 本质上是一种面向对象的机制,用于作
光标。一个ResultSet
将光标保留到当前数据行。叫next
在ResultSet
将此光标移动到下一行。Spring Batch 基于游标ItemReader
实现在初始化时打开一个游标,并将游标向前移动一行
每次调用read
,返回可用于处理的映射对象。这close
method 以确保释放所有资源。Spring 核心JdbcTemplate
通过使用回调模式将
所有行ResultSet
和 close ,然后再将控制权返回给方法调用方。
但是,在 Batch 中,这必须等到步骤完成。下图显示了
基于游标的ItemReader
工程。请注意,虽然示例
使用 SQL(因为 SQL 是如此广为人知),任何技术都可以实现基本的
方法。

此示例说明了基本模式。给定一个 'FOO' 表,它有三列:ID
,NAME
和BAR
,选择 ID 大于 1 但小于 7 的所有行。这
将光标的开头(第 1 行)放在 ID 2 上。此行的结果应为
完全映射Foo
对象。叫read()
再次将光标移动到下一行,
即Foo
ID 为 3。这些读取的结果将在每个read
,允许对对象进行垃圾回收(假设没有实例变量
维护对它们的引用)。
JdbcCursorItemReader
JdbcCursorItemReader
是基于游标的技术的 JDBC 实现。它有效
直接使用ResultSet
并需要针对连接运行 SQL 语句
从DataSource
.以下数据库架构用作示例:
CREATE TABLE CUSTOMER (
ID BIGINT IDENTITY PRIMARY KEY,
NAME VARCHAR(45),
CREDIT FLOAT
);
许多人喜欢对每一行使用域对象,因此以下示例使用
实现RowMapper
接口将CustomerCredit
对象:
public class CustomerCreditRowMapper implements RowMapper<CustomerCredit> {
public static final String ID_COLUMN = "id";
public static final String NAME_COLUMN = "name";
public static final String CREDIT_COLUMN = "credit";
public CustomerCredit mapRow(ResultSet rs, int rowNum) throws SQLException {
CustomerCredit customerCredit = new CustomerCredit();
customerCredit.setId(rs.getInt(ID_COLUMN));
customerCredit.setName(rs.getString(NAME_COLUMN));
customerCredit.setCredit(rs.getBigDecimal(CREDIT_COLUMN));
return customerCredit;
}
}
因为JdbcCursorItemReader
共享关键接口JdbcTemplate
,它很有用
请参阅有关如何读取此数据的示例JdbcTemplate
,以便对其进行对比
使用ItemReader
.对于此示例,假设 中有 1,000 行
这CUSTOMER
数据库。第一个示例使用JdbcTemplate
:
//For simplicity sake, assume a dataSource has already been obtained
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
List customerCredits = jdbcTemplate.query("SELECT ID, NAME, CREDIT from CUSTOMER",
new CustomerCreditRowMapper());
运行上述代码片段后,customerCredits
列表包含 1,000 个CustomerCredit
对象。在查询方法中,从DataSource
,则提供的 SQL 将针对它运行,并且mapRow
method 的调用请求
每个ResultSet
.将此与JdbcCursorItemReader
,如以下示例所示:
JdbcCursorItemReader itemReader = new JdbcCursorItemReader();
itemReader.setDataSource(dataSource);
itemReader.setSql("SELECT ID, NAME, CREDIT from CUSTOMER");
itemReader.setRowMapper(new CustomerCreditRowMapper());
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){
customerCredit = itemReader.read();
counter++;
}
itemReader.close();
运行上述代码片段后,计数器等于 1,000。如果上面的代码有
将返回的customerCredit
导入到列表中,结果将恰好是
与JdbcTemplate
例。然而,这个ItemReader
是它允许 “streamed” 项目。这read
method 可以调用一次,则 Item
可以通过ItemWriter
,然后可以通过read
.这允许以 'chunks' 的形式完成项目读取和写入并提交
定期,这是高性能批处理的本质。此外,它
易于配置为注入 Spring BatchStep
.
-
Java
-
XML
以下示例演示如何将ItemReader
转换为Step
在 Java 中:
@Bean
public JdbcCursorItemReader<CustomerCredit> itemReader() {
return new JdbcCursorItemReaderBuilder<CustomerCredit>()
.dataSource(this.dataSource)
.name("creditReader")
.sql("select ID, NAME, CREDIT from CUSTOMER")
.rowMapper(new CustomerCreditRowMapper())
.build();
}
以下示例演示如何将ItemReader
转换为Step
在 XML 中:
<bean id="itemReader" class="org.spr...JdbcCursorItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="sql" value="select ID, NAME, CREDIT from CUSTOMER"/>
<property name="rowMapper">
<bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
</property>
</bean>
其他属性
因为在 Java 中打开游标有很多不同的选项,所以有很多
属性JdbcCursorItemReader
,如下所述
桌子:
忽略警告 |
确定是记录 SQLWarnings 还是导致异常。
默认值为 |
fetchSize (获取大小) |
为 JDBC 驱动程序提供有关应获取的行数的提示
当 |
最大行数 |
设置基础的最大行数限制 |
查询超时 |
设置驱动程序等待 |
verifyCursorPosition |
因为 |
saveState |
指示是否应将读取器的状态保存在 |
driverSupportsAbsolute |
指示 JDBC 驱动程序是否支持
在 |
setUseSharedExtendedConnection |
指示连接
used for cursor 应该被所有其他处理使用,从而共享相同的
交易。如果此项设置为 |
StoredProcedureItemReader
有时需要使用存储过程来获取游标数据。这StoredProcedureItemReader
其工作原理类似于JdbcCursorItemReader
,只不过,改为
运行查询以获取游标时,它会运行返回游标的存储过程。
存储过程可以通过三种不同的方式返回游标:
-
作为返回的
ResultSet
(由 SQL Server、Sybase、DB2、Derby 和 MySQL 使用)。 -
作为作为 out 参数返回的 ref 游标(由 Oracle 和 PostgreSQL 使用)。
-
作为存储函数调用的返回值。
-
Java
-
XML
以下 Java 示例配置使用与 前面的例子:
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("sp_customer_credit");
reader.setRowMapper(new CustomerCreditRowMapper());
return reader;
}
以下 XML 示例配置使用与前面相同的 'customer credit' 示例 例子:
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="sp_customer_credit"/>
<property name="rowMapper">
<bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
</property>
</bean>
前面的示例依赖于存储过程来提供ResultSet
作为
返回结果(前面的选项 1)。
如果存储过程返回ref-cursor
(选项 2),那么我们需要提供
返回的 out 参数的位置ref-cursor
.
-
Java
-
XML
以下示例显示了如何使用第一个参数作为 Java:
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("sp_customer_credit");
reader.setRowMapper(new CustomerCreditRowMapper());
reader.setRefCursorPosition(1);
return reader;
}
以下示例显示了如何使用第一个参数作为 XML:
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="sp_customer_credit"/>
<property name="refCursorPosition" value="1"/>
<property name="rowMapper">
<bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
</property>
</bean>
如果游标是从存储函数返回的(选项 3),我们需要将
property “function” 设置为true
.它默认为false
.
-
Java
-
XML
以下示例显示了 property totrue
在 Java 中:
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("sp_customer_credit");
reader.setRowMapper(new CustomerCreditRowMapper());
reader.setFunction(true);
return reader;
}
以下示例显示了 property totrue
在 XML 中:
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="sp_customer_credit"/>
<property name="function" value="true"/>
<property name="rowMapper">
<bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
</property>
</bean>
在所有这些情况下,我们都需要定义一个RowMapper
以及DataSource
和
实际过程名称。
如果存储过程或函数接受参数,则必须声明它们并
使用parameters
财产。以下示例(对于 Oracle)声明了三个
参数。第一个是out
参数返回 ref-cursor,而
第二个和第三个是 in 参数,其值为INTEGER
.
-
Java
-
XML
以下示例演示如何在 Java 中使用参数:
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
List<SqlParameter> parameters = new ArrayList<>();
parameters.add(new SqlOutParameter("newId", OracleTypes.CURSOR));
parameters.add(new SqlParameter("amount", Types.INTEGER);
parameters.add(new SqlParameter("custId", Types.INTEGER);
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("spring.cursor_func");
reader.setParameters(parameters);
reader.setRefCursorPosition(1);
reader.setRowMapper(rowMapper());
reader.setPreparedStatementSetter(parameterSetter());
return reader;
}
以下示例演示如何在 XML 中使用参数:
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="spring.cursor_func"/>
<property name="parameters">
<list>
<bean class="org.springframework.jdbc.core.SqlOutParameter">
<constructor-arg index="0" value="newid"/>
<constructor-arg index="1">
<util:constant static-field="oracle.jdbc.OracleTypes.CURSOR"/>
</constructor-arg>
</bean>
<bean class="org.springframework.jdbc.core.SqlParameter">
<constructor-arg index="0" value="amount"/>
<constructor-arg index="1">
<util:constant static-field="java.sql.Types.INTEGER"/>
</constructor-arg>
</bean>
<bean class="org.springframework.jdbc.core.SqlParameter">
<constructor-arg index="0" value="custid"/>
<constructor-arg index="1">
<util:constant static-field="java.sql.Types.INTEGER"/>
</constructor-arg>
</bean>
</list>
</property>
<property name="refCursorPosition" value="1"/>
<property name="rowMapper" ref="rowMapper"/>
<property name="preparedStatementSetter" ref="parameterSetter"/>
</bean>
除了参数声明之外,我们还需要指定一个PreparedStatementSetter
设置调用的参数值的实现。这与
这JdbcCursorItemReader
以上。Additional Properties 中列出的所有其他属性都适用于StoredProcedureItemReader
也。
寻呼ItemReader
实现
使用数据库游标的另一种方法是运行多个查询,其中每个查询 获取结果的一部分。我们将此部分称为页面。每个查询必须 指定 起始行号 和我们希望在页面中返回的行数。
JdbcPagingItemReader
分页的一个实现ItemReader
是JdbcPagingItemReader
.这JdbcPagingItemReader
需要一个PagingQueryProvider
负责提供 SQL
用于检索构成页面的行的查询。由于每个数据库都有自己的
策略来提供分页支持,我们需要使用不同的PagingQueryProvider
对于每个支持的数据库类型。还有SqlPagingQueryProviderFactoryBean
,它会自动检测正在使用的数据库并确定适当的PagingQueryProvider
实现。这简化了配置,并且是
推荐的最佳实践。
这SqlPagingQueryProviderFactoryBean
要求您指定select
子句和from
第。您还可以提供可选的where
第。这些条款和
必填sortKey
用于构建 SQL 语句。
在sortKey 以保证
执行之间不会丢失任何数据。 |
打开读取器后,它会将每次调用传回一个项目read
在同一
与其他时尚一样的基本时尚ItemReader
.分页发生在幕后,当
需要额外的行。
-
Java
-
XML
以下 Java 示例配置使用与
基于游标ItemReaders
如前所示:
@Bean
public JdbcPagingItemReader itemReader(DataSource dataSource, PagingQueryProvider queryProvider) {
Map<String, Object> parameterValues = new HashMap<>();
parameterValues.put("status", "NEW");
return new JdbcPagingItemReaderBuilder<CustomerCredit>()
.name("creditReader")
.dataSource(dataSource)
.queryProvider(queryProvider)
.parameterValues(parameterValues)
.rowMapper(customerCreditMapper())
.pageSize(1000)
.build();
}
@Bean
public SqlPagingQueryProviderFactoryBean queryProvider() {
SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();
provider.setSelectClause("select id, name, credit");
provider.setFromClause("from customer");
provider.setWhereClause("where status=:status");
provider.setSortKey("id");
return provider;
}
以下 XML 示例配置使用与
基于游标ItemReaders
如前所示:
<bean id="itemReader" class="org.spr...JdbcPagingItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="queryProvider">
<bean class="org.spr...SqlPagingQueryProviderFactoryBean">
<property name="selectClause" value="select id, name, credit"/>
<property name="fromClause" value="from customer"/>
<property name="whereClause" value="where status=:status"/>
<property name="sortKey" value="id"/>
</bean>
</property>
<property name="parameterValues">
<map>
<entry key="status" value="NEW"/>
</map>
</property>
<property name="pageSize" value="1000"/>
<property name="rowMapper" ref="customerMapper"/>
</bean>
此配置ItemReader
返回CustomerCredit
对象使用RowMapper
,
必须指定。'pageSize' 属性确定读取的实体数量
从数据库进行每个查询运行。
'parameterValues' 属性可用于指定Map
of 参数值的
查询。如果您在where
子句,则每个条目的键应
匹配命名参数的名称。如果您使用传统的 '?' 占位符,则
key 应该是占位符的编号,从 1 开始。
JpaPagingItemReader
分页的另一种实现ItemReader
是JpaPagingItemReader
.JPA 执行
没有类似于 Hibernate 的概念StatelessSession
,所以我们不得不使用其他
功能。由于 JPA 支持分页,因此这是很自然的
在使用 JPA 进行批处理时的选择。读取每个页面后,
实体将分离,并且持久化上下文将被清除,以允许实体
在页面处理完毕后进行垃圾回收。
这JpaPagingItemReader
允许您声明一个 JPQL 语句并传入一个EntityManagerFactory
.然后,它每次调用传回一个项目以读取相同的基本
时尚无双ItemReader
.分页发生在幕后,当额外的
entities 是必需的。
-
Java
-
XML
以下 Java 示例配置使用与 前面显示的 JDBC 读取器:
@Bean
public JpaPagingItemReader itemReader() {
return new JpaPagingItemReaderBuilder<CustomerCredit>()
.name("creditReader")
.entityManagerFactory(entityManagerFactory())
.queryString("select c from CustomerCredit c")
.pageSize(1000)
.build();
}
以下 XML 示例配置使用与 前面显示的 JDBC 读取器:
<bean id="itemReader" class="org.spr...JpaPagingItemReader">
<property name="entityManagerFactory" ref="entityManagerFactory"/>
<property name="queryString" value="select c from CustomerCredit c"/>
<property name="pageSize" value="1000"/>
</bean>
此配置ItemReader
返回CustomerCredit
对象,其方式与
描述JdbcPagingItemReader
上,假设CustomerCredit
object 具有
正确的 JPA 注释或 ORM 映射文件。'pageSize' 属性确定
每次查询执行从数据库中读取的实体数。
数据库 ItemWriters
虽然平面文件和 XML 文件都有特定的ItemWriter
实例,则没有确切的等效项
在数据库世界中。这是因为交易提供了所有需要的功能。ItemWriter
实现对于 Files 来说是必需的,因为它们必须像事务一样运行,
跟踪写入的项目并在适当的时候进行冲洗或清理。
数据库不需要此功能,因为写入已经包含在
交易。用户可以创建自己的 DAO 来实现ItemWriter
interface 或
使用自定义中的一个ItemWriter
这是为通用处理问题编写的。也
方式,它们应该可以正常工作。需要注意的一件事是性能
以及通过批处理输出提供的错误处理功能。这是最
常见,当使用 Hibernate 作为ItemWriter
但在使用
JDBC 批处理模式。批处理数据库输出没有任何固有的缺陷,假设我们
请小心刷新,并且数据中没有错误。但是,任何
写入可能会引起混淆,因为无法知道是哪个单独的项目引起的
异常,或者即使任何单个项目负责,如
下图:

如果在写入之前缓冲了项,则在缓冲区
flushed 的 SET 文件。例如,假设每个块写入 20 个项目,
第 15 项会引发DataIntegrityViolationException
.就Step
就此而言,这 20 项都写成功了,因为没有办法知道
错误会发生,直到它们被实际写入为止。一次Session#flush()
称为
buffer 被清空并命中异常。此时,没有任何Step
可以做到。必须回滚事务。通常,此异常可能会导致
要跳过的项目(取决于跳过/重试策略),然后不写入
再。但是,在批处理场景中,无法知道是哪个项目导致了
问题。发生故障时,正在写入整个缓冲区。唯一的方法
解决此问题是在每个项后刷新,如下图所示:

这是一个常见的用例,尤其是在使用 Hibernate 时,以及
的实现ItemWriter
是在每次调用write()
.这样做允许
为了可靠地跳过项目,Spring Batch 在内部负责
对ItemWriter
在错误之后。