JDBC 支持
JDBC 支持
Spring 集成提供了通道适配器,用于使用数据库查询来接收和发送消息。 通过这些适配器, Spring 集成不仅支持普通的 JDBC SQL 查询,还支持存储过程和存储函数调用。
您需要将此依赖项包含在您的项目中:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jdbc</artifactId>
<version>6.0.9</version>
</dependency>
compile "org.springframework.integration:spring-integration-jdbc:6.0.9"
默认情况下,以下 JDBC 组件可用:
Spring 集成 JDBC 模块还提供了一个 JDBC 消息存储。
入站通道适配器
入站通道适配器的主要功能是执行 SQL 查询并将结果集转换为消息。
消息有效负载是整个结果集(表示为 a ),列表中项目的类型取决于行映射策略。
默认策略是一个通用映射器,它在查询结果中的每一行都返回一个。
或者,你可以通过添加对实例的引用来更改此设置(有关行映射的更多详细信息,请参阅 Spring JDBC 文档)。SELECT
List
Map
RowMapper
如果要将查询结果中的行转换为单个消息,可以使用下游拆分器。SELECT |
入站适配器还需要对 instance 或 .JdbcTemplate
DataSource
除了用于生成消息的语句外,适配器还具有一个语句,该语句将记录标记为已处理,以便它们不会显示在下一次轮询中。
更新可以通过原始 select 中的 ID 列表进行参数化。
默认情况下,这是通过命名约定完成的(输入结果集中的列 called 被转换为名为 的更新的参数映射中的列表)。
以下示例定义具有更新查询和引用的入站通道适配器。SELECT
UPDATE
id
id
DataSource
<int-jdbc:inbound-channel-adapter query="select * from item where status=2"
channel="target" data-source="dataSource"
update="update item set status=10 where id in (:id)" />
更新查询中的参数在参数名称后加上冒号 () 前缀(在前面的示例中,该参数是要应用于轮询结果集中每一行的表达式)。
这是 Spring JDBC 中命名参数 JDBC 支持的标准功能,与 Spring 集成中采用的约定(投影到轮询结果列表)相结合。
Spring 的基础 JDBC 功能限制了可用的表达式(例如,除句点以外的大多数特殊字符都是不允许的),但是由于目标通常是可通过 bean 路径寻址的对象列表(可能是一的列表),因此并没有过度限制。: |
要更改参数生成策略,可以将 a 注入到适配器中以覆盖默认行为(适配器具有属性)。
Spring Integration 提供了 ,它创建了一个基于 SPEL 的参数源,并将查询结果作为对象。
(如果为 true,则根对象为行)。
如果相同的参数名称在 update 查询中多次出现,则仅计算一次,并缓存其结果。SqlParameterSourceFactory
sql-parameter-source-factory
ExpressionEvaluatingSqlParameterSourceFactory
#root
update-per-row
您还可以将参数源用于 select 查询。 在这种情况下,由于没有要评估的 “result” 对象,因此每次都使用单个参数源(而不是使用参数源工厂)。 从版本 4.0 开始,你可以使用 Spring 创建基于 SPEL 的参数源,如下例所示:
<int-jdbc:inbound-channel-adapter query="select * from item where status=:status"
channel="target" data-source="dataSource"
select-sql-parameter-source="parameterSource" />
<bean id="parameterSource" factory-bean="parameterSourceFactory"
factory-method="createParameterSourceNoCache">
<constructor-arg value="" />
</bean>
<bean id="parameterSourceFactory"
class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
<property name="parameterExpressions">
<map>
<entry key="status" value="@statusBean.which()" />
</map>
</property>
</bean>
<bean id="statusBean" class="foo.StatusDetermination" />
in each 参数表达式可以是任何有效的 SPEL 表达式。
表达式求值的对象是在 Bean 上定义的 constructor 参数。
它对于所有评估都是静态的(在前面的示例中,为空)。value
#root
parameterSource
String
从版本 5.0 开始,您可以提供 来指定特定参数的目标 SQL 类型。ExpressionEvaluatingSqlParameterSourceFactory
sqlParameterTypes
以下示例为查询中使用的参数提供了 SQL 类型:
<int-jdbc:inbound-channel-adapter query="select * from item where status=:status"
channel="target" data-source="dataSource"
select-sql-parameter-source="parameterSource" />
<bean id="parameterSource" factory-bean="parameterSourceFactory"
factory-method="createParameterSourceNoCache">
<constructor-arg value="" />
</bean>
<bean id="parameterSourceFactory"
class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
<property name="sqlParameterTypes">
<map>
<entry key="status" value="#{ T(java.sql.Types).BINARY}" />
</map>
</property>
</bean>
使用工厂方法。
否则,参数源将缓存计算结果。
另请注意,由于禁用了缓存,因此如果相同的参数名称多次出现在 select 查询中,则会针对每次出现的情况重新评估该名称。createParameterSourceNoCache |
轮询和事务
入站适配器接受常规的 Spring 集成 Poller 作为子元素。 因此,可以控制轮询的频率(以及其他用途)。 用于 JDBC 的 Poller 的一个重要功能是可以选择将 poll 操作包装在事务中,如下例所示:
<int-jdbc:inbound-channel-adapter query="..."
channel="target" data-source="dataSource" update="...">
<int:poller fixed-rate="1000">
<int:transactional/>
</int:poller>
</int-jdbc:inbound-channel-adapter>
如果未显式指定 Poller,则使用默认值。 与 Spring Integration 一样,它可以被定义为顶级 bean)。 |
在前面的示例中,数据库每 1000 毫秒轮询一次(或每秒轮询一次),并且 update 和 select 查询都在同一事务中执行。 未显示事务管理器配置。 但是,只要它知道数据源,轮询就是事务性的。 一个常见的用例是将 downstream channels 作为 direct channels(默认),以便在同一个线程中调用端点,从而在同一个 transaction 中调用。 这样,如果其中任何一个失败,事务将回滚,并且 Importing 数据将恢复到其原始状态。
max-rows
对max-messages-per-poll
JDBC 入站通道适配器定义了一个名为 的属性。
指定适配器的 Poller 时,还可以定义一个名为 .
虽然这两个属性看起来相似,但它们的含义却大不相同。max-rows
max-messages-per-poll
max-messages-per-poll
指定每个轮询间隔执行查询的次数,而指定每次执行返回的行数。max-rows
在正常情况下,当你使用 JDBC 入站通道适配器时,你可能不想设置 poller 的属性。
它的默认值是 ,这意味着 JDBC 入站通道适配器的 receive()
方法在每个轮询间隔中只执行一次。max-messages-per-poll
1
将属性设置为更大的值意味着查询将连续执行多次。
有关该属性的更多信息,请参见配置入站通道适配器。max-messages-per-poll
max-messages-per-poll
相反, property 如果大于 ,则指定要从方法创建的查询结果集中使用的最大行数。
如果属性设置为 ,则所有行都包含在结果消息中。
该属性默认为 .max-rows
0
receive()
0
0
建议通过特定于供应商的查询选项(例如 MySQL 或 SQL Server 或 Oracle 的)使用结果集限制。
有关更多信息,请参阅特定供应商文档。LIMIT TOP ROWNUM |
出站通道适配器
出站通道适配器是入站通道适配器的反面:它的作用是处理消息并使用它来执行 SQL 查询。 默认情况下,消息有效负载和标头可用作查询的输入参数,如下例所示:
<int-jdbc:outbound-channel-adapter
query="insert into foos (id, status, name) values (:headers[id], 0, :payload[something])"
data-source="dataSource"
channel="input"/>
在前面的示例中,到达标记为的通道的消息具有 map 的有效负载,其键为 ,因此运算符从 map 中取消引用该值。
标头也作为映射进行访问。input
something
[]
上述查询中的参数是传入消息上的 Bean 属性表达式(不是 SPEL 表达式)。
此行为是 的一部分,它是出站适配器创建的默认源。
你可以注入不同的 API 来获得不同的行为。SqlParameterSource SqlParameterSourceFactory |
出站适配器需要对 a 或 a 的引用。
您还可以注入 a 来控制每个传入消息与查询的绑定。DataSource
JdbcTemplate
SqlParameterSourceFactory
如果 input 通道是直接通道,则出站适配器在与消息发送方相同的线程中运行其查询,因此,与消息的发送者相同的事务(如果有的话)。
使用 SPEL 表达式传递参数
大多数 JDBC 通道适配器的常见要求是将参数作为 SQL 查询或存储过程或函数的一部分进行传递。
如前所述,默认情况下,这些参数是 Bean 属性表达式,而不是 SPEL 表达式。
但是,如果需要将 SpEL 表达式作为参数传递,则必须显式注入 .SqlParameterSourceFactory
下面的示例使用 a 来实现该要求:ExpressionEvaluatingSqlParameterSourceFactory
<jdbc:outbound-channel-adapter data-source="dataSource" channel="input"
query="insert into MESSAGES (MESSAGE_ID,PAYLOAD,CREATED_DATE) values (:id, :payload, :createdDate)"
sql-parameter-source-factory="spelSource"/>
<bean id="spelSource"
class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
<property name="parameterExpressions">
<map>
<entry key="id" value="headers['id'].toString()"/>
<entry key="createdDate" value="new java.util.Date()"/>
<entry key="payload" value="payload"/>
</map>
</property>
</bean>
有关详细信息,请参阅 定义参数源。
使用回调PreparedStatement
有时,的灵活性和松散耦合并不能完成我们对目标的需求,或者我们需要做一些低级的 JDBC 工作。
Spring JDBC 模块提供了 API 来配置执行环境(如 或 )和操作参数值(如 )。
它甚至可以访问用于低级操作的 API,例如 .SqlParameterSourceFactory
PreparedStatement
ConnectionCallback
PreparedStatementCreator
SqlParameterSource
StatementCallback
从 Spring Integration 4.2 开始,允许在上下文中手动指定参数。
这个类的作用与标准 Spring JDBC API 中的角色完全相同。
实际上,当对 .MessagePreparedStatementSetter
PreparedStatement
requestMessage
PreparedStatementSetter
PreparedStatementSetter
JdbcMessageHandler
execute
JdbcTemplate
此功能接口选项与 互斥,可以用作更强大的替代方法,用于填充 的参数。
例如,当我们需要以流式处理方式将数据存储到 DataBase 列时,它非常有用。
以下示例显示了如何执行此操作:sqlParameterSourceFactory
PreparedStatement
requestMessage
File
BLOB
@Bean
@ServiceActivator(inputChannel = "storeFileChannel")
public MessageHandler jdbcMessageHandler(DataSource dataSource) {
JdbcMessageHandler jdbcMessageHandler = new JdbcMessageHandler(dataSource,
"INSERT INTO imagedb (image_name, content, description) VALUES (?, ?, ?)");
jdbcMessageHandler.setPreparedStatementSetter((ps, m) -> {
ps.setString(1, m.getHeaders().get(FileHeaders.FILENAME));
try (FileInputStream inputStream = new FileInputStream((File) m.getPayload()); ) {
ps.setBlob(2, inputStream);
}
catch (Exception e) {
throw new MessageHandlingException(m, e);
}
ps.setClob(3, new StringReader(m.getHeaders().get("description", String.class)));
});
return jdbcMessageHandler;
}
从 XML 配置的角度来看,该属性在组件上可用。
它允许您指定 Bean 引用。prepared-statement-setter
<int-jdbc:outbound-channel-adapter>
MessagePreparedStatementSetter
批量更新
从版本 5.1 开始,如果请求消息的有效负载是实例,则执行 。
如果 this 元素不是 ,则 the 的每个元素都与请求消息中的 Headers 一起包装到 a。
在基于 regular 的配置的情况下,这些消息用于构建上述函数中使用的 for 参数。
应用配置后,将使用 variant 遍历每个项目的这些消息,并针对它们调用 provided 。
选择模式时,不支持批量更新。JdbcMessageHandler
JdbcOperations.batchUpdate()
Iterable
Iterable
Message
Message
SqlParameterSourceFactory
SqlParameterSource[]
JdbcOperations.batchUpdate()
MessagePreparedStatementSetter
BatchPreparedStatementSetter
MessagePreparedStatementSetter
keysGenerated
出站网关
出站网关就像出站和入站适配器的组合:它的作用是处理消息并使用它来执行 SQL 查询,然后通过将其发送到回复通道来响应结果。 默认情况下,消息有效负载和标头可用作查询的输入参数,如下例所示:
<int-jdbc:outbound-gateway
update="insert into mythings (id, status, name) values (:headers[id], 0, :payload[thing])"
request-channel="input" reply-channel="output" data-source="dataSource" />
前面示例的结果是将记录插入表中,并返回一条消息,该消息指示受影响的行数(有效负载是 map: )到输出通道。mythings
{UPDATED=1}
如果更新查询是具有自动生成的键的插入,则可以通过添加到前面的示例(这不是默认值,因为某些数据库平台不支持它),从而使用生成的键填充回复消息。
以下示例显示了更改后的配置:keys-generated="true"
<int-jdbc:outbound-gateway
update="insert into mythings (status, name) values (0, :payload[thing])"
request-channel="input" reply-channel="output" data-source="dataSource"
keys-generated="true"/>
除了更新计数或生成的密钥,您还可以提供 select 查询来执行并从结果(例如入站适配器)生成回复消息,如下例所示:
<int-jdbc:outbound-gateway
update="insert into foos (id, status, name) values (:headers[id], 0, :payload[foo])"
query="select * from foos where id=:headers[$id]"
request-channel="input" reply-channel="output" data-source="dataSource"/>
从 Spring Integration 2.2 开始,更新 SQL 查询不再是必需的。
现在,您可以使用 attribute 或 element 仅提供 select 查询。
如果您需要使用通用网关或有效负载扩充器等方式主动检索数据,这将非常有用。
然后从结果生成回复消息(类似于入站适配器的工作方式)并传递给回复通道。
以下示例显示了如何使用该属性:query
query
query
<int-jdbc:outbound-gateway
query="select * from foos where id=:headers[id]"
request-channel="input"
reply-channel="output"
data-source="dataSource"/>
默认情况下,查询的组件仅从游标返回一行(第一行)。
您可以使用该选项调整此行为。
如果需要返回 SELECT 中的所有行,请考虑指定 . |
与通道适配器一样,您也可以为 request 和 reply 提供实例。
默认值与出站适配器相同,因此请求消息可用作表达式的根。
If ,表达式的根是生成的键(如果只有一个映射,则为映射,如果多值,则为映射列表)。SqlParameterSourceFactory
keys-generated="true"
出站网关需要对 a 或 a 的引用。
它还可以具有 injected 来控制传入消息与查询的绑定。DataSource
JdbcTemplate
SqlParameterSourceFactory
从版本 4.2 开始,该属性在 上作为 的替代项提供。
它允许您指定 bean 引用,该引用在执行之前实现更复杂的准备。request-prepared-statement-setter
<int-jdbc:outbound-gateway>
request-sql-parameter-source-factory
MessagePreparedStatementSetter
PreparedStatement
从版本 6.0 开始,将按原样返回一个空列表结果,而不是将其转换为以前的样子,其含义为 “no reply”。
这会导致在处理空列表是 downstream logic一部分的应用程序中进行额外的配置。
请参阅 Splitter Discard Channel 了解可能的空列表处理选项。JdbcOutboundGateway
null
请参阅 出站通道适配器 以了解有关 的更多信息。MessagePreparedStatementSetter
JDBC 消息存储
Spring 集成提供了两种特定于 JDBC 的消息存储实现。
适合与聚合商和索赔检查模式一起使用。
该实现专门为消息通道提供了更具针对性和可扩展性的实现。JdbcMessageStore
JdbcChannelMessageStore
请注意,您可以使用 a 来支持消息通道,它是为此目的而优化的。JdbcMessageStore
JdbcChannelMessageStore
从版本 5.0.11、5.1.2 开始,这些的索引已得到优化。
如果您在此类存储中有大型消息组,则可能需要更改索引。
此外,索引 for 被注释掉,因为除非您使用 JDBC 支持的此类通道,否则不需要它。JdbcChannelMessageStore PriorityChannel |
使用 时,必须添加优先级通道索引,因为它包含在查询的 hint 中。OracleChannelMessageStoreQueryProvider |
初始化数据库
在开始使用 JDBC 消息存储组件之前,您应该为目标数据库配置适当的对象。
Spring 集成附带了一些可用于初始化数据库的示例脚本。
在 JAR 文件中,您可以在包中找到脚本。
它为一系列常见数据库平台提供了示例 create 和 example drop 脚本。
使用这些脚本的常见方法是在 Spring JDBC 数据源初始化器中引用它们。
请注意,这些脚本作为示例以及所需表和列名称的规范提供。
您可能会发现需要增强它们以供 生产环境 使用(例如,通过添加索引声明)。spring-integration-jdbc
org.springframework.integration.jdbc
通用 JDBC 消息存储
JDBC 模块提供了 Spring 集成的实现(在声明检查模式中很重要)和(在有状态模式(如聚合器)中很重要)由数据库支持。
这两个接口都是由 实现的,并且支持在 XML 中配置存储实例,如下例所示:MessageStore
MessageGroupStore
JdbcMessageStore
<int-jdbc:message-store id="messageStore" data-source="dataSource"/>
您可以指定 a 而不是 .JdbcTemplate
DataSource
以下示例显示了一些其他可选属性:
<int-jdbc:message-store id="messageStore" data-source="dataSource"
lob-handler="lobHandler" table-prefix="MY_INT_"/>
在前面的示例中,我们指定了 a 用于将消息作为大型对象处理(这通常对 Oracle 来说是必需的)和 store 生成的查询中的表名的 prefix。
表名前缀默认为 .LobHandler
INT_
后备消息通道
如果您打算使用 JDBC 备份消息通道,我们建议使用实现。
它只能与 Message Channel 结合使用。JdbcChannelMessageStore
支持的数据库
使用特定于数据库的 SQL 查询从数据库中检索消息。
因此,您必须在 .
这将为您指定的特定数据库提供 SQL 查询。
Spring 集成为以下关系数据库提供支持:JdbcChannelMessageStore
ChannelMessageStoreQueryProvider
JdbcChannelMessageStore
channelMessageStoreQueryProvider
-
PostgreSQL 数据库
-
HSQLDB 数据库
-
MySQL (MySQL的
-
神谕
-
德比
-
H2 系列
-
SqlServer 服务器
-
Sybase
-
DB2
如果您的数据库未列出,您可以扩展该类并提供您自己的自定义查询。AbstractChannelMessageStoreQueryProvider
版本 4.0 在表中添加了该列,以确保先进先出 (FIFO) 排队,即使消息以相同的毫秒数存储也是如此。MESSAGE_SEQUENCE
自定义消息插入
从版本 5.0 开始,通过重载该类,您可以在 .
您可以使用它来设置不同的列或更改表结构或序列化策略。
例如,您可以将其结构存储为 JSON 字符串,而不是默认序列化 。ChannelMessageStorePreparedStatementSetter
JdbcChannelMessageStore
byte[]
以下示例使用默认实现 来存储公共列,并覆盖将消息有效负载存储为 的行为 :setValues
varchar
public class JsonPreparedStatementSetter extends ChannelMessageStorePreparedStatementSetter {
@Override
public void setValues(PreparedStatement preparedStatement, Message<?> requestMessage,
Object groupId, String region, boolean priorityEnabled) throws SQLException {
// Populate common columns
super.setValues(preparedStatement, requestMessage, groupId, region, priorityEnabled);
// Store message payload as varchar
preparedStatement.setString(6, requestMessage.getPayload().toString());
}
}
通常,我们不建议使用关系数据库进行排队。 相反,如果可能,请考虑改用 JMS 或 AMQP 支持的通道。 有关进一步参考,请参阅以下资源: 如果您仍计划将数据库用作队列,请考虑使用 PostgreSQL 及其通知机制,这将在后续部分中介绍。 |
并发轮询
轮询消息通道时,您可以选择配置与引用关联的 。Poller
TaskExecutor
但请记住,如果您使用 JDBC 支持的消息通道,并且计划使用多个线程对通道进行事务性轮询,从而轮询消息存储,则应确保使用支持多版本并发控制 (MVCC) 的关系数据库。 否则,锁定可能是一个问题,并且在使用多个线程时,性能可能无法按预期实现。 例如,Apache Derby 在这方面是有问题的。 为了实现更好的 JDBC 队列吞吐量并避免不同线程可能从队列中轮询相同的线程时出现问题,在使用不支持 MVCC 的数据库时,将 的属性设置为 to 非常重要。
以下示例显示了如何执行此操作:
|
优先通道
从版本 4.0 开始,实现并提供选项,使其用作实例的参考。
为此,该表有一个列来存储消息标头的值。
此外,新列让我们实现了强大的先进先出 (FIFO) 轮询机制,即使在同一毫秒内以相同的优先级存储了多条消息。
使用 从数据库中轮询(选择)消息。JdbcChannelMessageStore
PriorityCapableChannelMessageStore
priorityEnabled
message-store
priority-queue
INT_CHANNEL_MESSAGE
MESSAGE_PRIORITY
PRIORITY
MESSAGE_SEQUENCE
order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE
我们不建议对优先级和非优先级队列通道使用相同的 bean,因为该选项适用于整个存储,并且不会为队列通道保留适当的 FIFO 队列语义。
但是,同一表 (甚至 ) 可用于这两种类型。
要配置该场景,可以从一个消息存储 Bean 扩展另一个消息存储 Bean,如下例所示:JdbcChannelMessageStore priorityEnabled INT_CHANNEL_MESSAGE region JdbcChannelMessageStore |
<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
<property name="dataSource" ref="dataSource"/>
<property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>
<int:channel id="queueChannel">
<int:queue message-store="channelStore"/>
</int:channel>
<bean id="priorityStore" parent="channelStore">
<property name="priorityEnabled" value="true"/>
</bean>
<int:channel id="priorityChannel">
<int:priority-queue message-store="priorityStore"/>
</int:channel>
对消息存储进行分区
通常将 用作同一应用程序中的一组应用程序或节点的全局存储。
为了提供一些防止名称冲突的保护并控制数据库元数据配置,消息存储允许以两种方式对表进行分区。
一种方法是通过更改前缀(如前所述)来使用单独的表名。
另一种方法是指定一个名称,用于在单个表中对数据进行分区。
第二种方法的一个重要用例是当 Management 支持 Spring Integration Message Channel 的持久队列时。
持久通道的消息数据在存储区中的通道名称上键入。
因此,如果通道名称不是全局唯一的,则通道可以选取并非适用于它们的数据。
为避免这种危险,您可以使用消息存储将具有相同逻辑名称的不同物理通道的数据分开。JdbcMessageStore
region
MessageStore
region
PostgreSQL:接收推送通知
PostgreSQL 提供了一个侦听和通知框架,用于在数据库表操作时接收推送通知。
Spring 集成利用这种机制(从版本 6.0 开始)来允许在将新消息添加到 .
使用此功能时,必须定义一个数据库触发器,该触发器可以作为 Spring Integration 的 JDBC 模块中包含的文件的注释的一部分找到。JdbcChannelMessageStore
schema-postgresql.sql
推送通知是通过类接收的,该类允许其订阅者在到达任何给定 和 的新消息时接收回调。
即使消息附加到不同的 JVM 上,但附加到同一个数据库,也会收到这些通知。
该实现使用协定从 store 中提取消息,作为对来自上述通知的通知的反应。PostgresChannelMessageTableSubscriber
region
groupId
PostgresSubscribableChannel
PostgresChannelMessageTableSubscriber.Subscription
PostgresChannelMessageTableSubscriber
例如,可以按如下方式接收 的推送通知:some group
@Bean
public JdbcChannelMessageStore messageStore(DataSource dataSource) {
JdbcChannelMessageStore messageStore = new JdbcChannelMessageStore(dataSource);
messageStore.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider());
return messageStore;
}
@Bean
public PostgresChannelMessageTableSubscriber subscriber(
@Value("${spring.datasource.url}") String url,
@Value("${spring.datasource.username}") String username,
@Value("${spring.datasource.password}") String password) {
return new PostgresChannelMessageTableSubscriber(() ->
DriverManager.getConnection(url, username, password).unwrap(PgConnection.class));
}
@Bean
public PostgresSubscribableChannel channel(
PostgresChannelMessageTableSubscriber subscriber,
JdbcChannelMessageStore messageStore) {
return new PostgresSubscribableChannel(messageStore, "some group", subscriber);
}
事务支持
从版本 6.0.5 开始,在 a 上指定 a 将通知事务中的订阅者。
订户中的异常将导致事务回滚,并将消息放回消息存储中。
默认情况下,事务支持未激活。PlatformTransactionManager
PostgresSubscribableChannel
重试
从版本 6.0.5 开始,可以通过向 .
默认情况下,不执行重试。RetryTemplate
PostgresSubscribableChannel
任何活动在其活动生命周期内都占用一个独占 JDBC。
因此,此连接不源自 pooling 非常重要。
此类连接池通常希望发出的连接在预定义的超时窗口内关闭。 对于独占连接的这种需求,还建议 JVM 只运行一个可用于注册任意数量的订阅的单个连接。 |
存储过程
在某些情况下,普通的 JDBC 支持是不够的。 也许您处理的是旧式关系数据库架构,或者您有复杂的数据处理需求,但最终您必须使用存储过程或存储函数。 从 Spring Integration 2.1 开始,我们提供了三个组件来执行存储过程或存储函数:
-
存储过程入站通道适配器
-
存储过程出站通道适配器
-
存储过程出站网关
支持的数据库
为了启用对存储过程和存储函数的调用,存储过程组件使用org.springframework.jdbc.core.simple.SimpleJdbcCall
类。
因此,完全支持以下数据库执行存储过程:
-
阿帕奇德比
-
DB2
-
MySQL (MySQL的
-
Microsoft SQL 服务器
-
神谕
-
PostgreSQL 数据库
-
Sybase
如果您想改为执行存储函数,则完全支持以下数据库:
-
MySQL (MySQL的
-
Microsoft SQL 服务器
-
神谕
-
PostgreSQL 数据库
即使您的特定数据库可能没有得到完全支持,只要您的 RDBMS 支持存储过程或存储函数,您很可能可以非常成功地使用存储过程 Spring Integration 组件。 事实上,一些提供的集成测试使用 H2 数据库。 尽管如此,全面测试这些使用场景非常重要。 |
常见配置属性
所有存储过程组件共享某些配置参数:
-
auto-startup
:生命周期属性,指示是否应在应用程序上下文启动期间启动此组件。 它默认为 . 自选。true
-
data-source
:对 的引用,用于访问数据库。 必填。javax.sql.DataSource
-
id
:标识基础 Spring Bean 定义,它是 或 的实例,具体取决于出站通道适配器的属性是引用 a 还是 a 。 自选。EventDrivenConsumer
PollingConsumer
channel
SubscribableChannel
PollableChannel
-
ignore-column-meta-data
:对于完全支持的数据库,底层的SimpleJdbcCall
类可以从 JDBC 元数据中自动检索存储过程或存储函数的参数信息。但是,如果数据库不支持元数据查找,或者您需要提供自定义参数定义,则可以将此标志设置为 。 它默认为 . 自选。
true
false
-
is-function
:如果 ,则调用 SQL 函数。 在这种情况下,or 属性定义被调用函数的名称。 它默认为 . 自选。true
stored-procedure-name
stored-procedure-name-expression
false
-
stored-procedure-name
:此属性指定存储过程的名称。 如果该属性设置为 ,则此属性将指定函数名称。 此属性 或 必须指定。is-function
true
stored-procedure-name-expression
-
stored-procedure-name-expression
:此属性使用 SpEL 表达式指定存储过程的名称。 通过使用 SPEL,您可以访问完整的消息(如果可用),包括其 Headers 和有效负载。 您可以使用此属性在运行时调用不同的存储过程。 例如,您可以提供要作为消息标头执行的存储过程名称。 表达式必须解析为 .String
如果该属性设置为 ,则此属性指定存储函数。 此属性 或 必须指定。
is-function
true
stored-procedure-name
-
jdbc-call-operations-cache-size
:定义缓存实例的最大数量。 基本上,对于每个存储过程名称,都会创建一个新的SimpleJdbcCallOperations
实例,作为回报,该实例将被缓存。SimpleJdbcCallOperations
Spring Integration 2.2 添加了 attribute 和 attribute。 stored-procedure-name-expression
jdbc-call-operations-cache-size
默认缓存大小为 。 值 disable 缓存。 不允许使用负值。
10
0
如果启用 JMX,则有关 的统计信息将作为 MBean 公开。 有关更多信息,请参阅 MBean Exporter 。
jdbc-call-operations-cache
-
sql-parameter-source-factory
:(不适用于存储过程入站通道适配器。 对 . 默认情况下,传入的有效负载的 bean 属性通过使用 .SqlParameterSourceFactory
Message
BeanPropertySqlParameterSourceFactory
对于基本用例,这可能就足够了。 对于更复杂的选项,请考虑传入一个或多个值。 请参阅定义参数源。 自选。
ProcedureParameter
-
use-payload-as-parameter-source
:(不适用于存储过程入站通道适配器。 如果设置为 ,则 的有效负载将用作提供参数的源。 但是,如果设置为 ,则 entire 可用作参数的源。true
Message
false
Message
如果未传入任何过程参数,则此属性默认为 。 这意味着,通过使用 default ,有效负载的 bean 属性将用作存储过程或存储函数的参数值的源。
true
BeanPropertySqlParameterSourceFactory
但是,如果传入了过程参数,则此属性(默认情况下)的计算结果为 . 允许提供 SpEL 表达式。 因此,访问整个 . 该属性在基础 上设置。 自选。
false
ProcedureParameter
Message
StoredProcExecutor
常见配置子元素
存储过程组件共享一组通用的子元素,您可以使用这些子元素来定义参数并将其传递给存储过程或存储函数。 以下元素可用:
-
parameter
-
returning-resultset
-
sql-parameter-definition
-
poller
-
parameter
:提供存储过程参数的机制。 参数可以是静态的,也可以是使用 SPEL 表达式提供的。<int-jdbc:parameter name="" (1) type="" (2) value=""/> (3) <int-jdbc:parameter name="" expression=""/> (4)
+ <1> 要传递到存储过程或存储函数中的参数的名称。 必填。 <2> 此属性指定值的类型。 如果未提供任何内容,则此属性默认为 . 仅当使用该属性时,才使用此属性。 自选。 <3> 参数的值。 您必须提供此属性或属性。 自选。 <4> 您可以指定 SPEL 表达式来传递参数的值,而不是 attribute。 如果指定 ,则不允许使用该属性。 自选。
java.lang.String
value
expression
value
expression
value
自选。
-
returning-resultset
:存储过程可能会返回多个结果集。 通过设置一个或多个元素,您可以指定将每个返回的对象转换为有意义的对象。 自选。returning-resultset
RowMappers
ResultSet
<int-jdbc:returning-resultset name="" row-mapper="" />
-
sql-parameter-definition
:如果使用完全支持的数据库,则通常不必指定存储过程参数定义。 相反,这些参数可以自动从 JDBC 元数据派生。 但是,如果您使用的数据库不完全受支持,则必须使用 element 显式设置这些参数。sql-parameter-definition
您还可以使用该属性选择关闭对通过 JDBC 获取的参数元数据信息的任何处理。
ignore-column-meta-data
<int-jdbc:sql-parameter-definition name="" (1) direction="IN" (2) type="STRING" (3) scale="5" (4) type-name="FOO_STRUCT" (5) return-type="fooSqlReturnType"/> (6)
1 指定 SQL 参数的名称。 必填。 2 指定 SQL 参数定义的方向。 默认为 。 有效值为: 、 和 。 如果过程返回结果集,请使用 element. 自选。 IN
IN
OUT
INOUT
returning-resultset
3 用于此 SQL 参数定义的 SQL 类型。 转换为整数值,如 定义。 或者,您也可以提供整数值。 如果未明确设置此属性,则默认为 'VARCHAR'。 自选。 java.sql.Types
4 SQL 参数的比例。 仅用于数字和十进制参数。 自选。 5 for 用户命名的类型,例如: 、 、 和 named 数组类型。 此属性与该属性互斥。 自选。 typeName
STRUCT
DISTINCT
JAVA_OBJECT
scale
6 对复杂类型的自定义值处理程序的引用。 SqlReturnType
的实现。 此属性与该属性互斥,仅适用于 OUT 和 INOUT 参数。 自选。scale
-
poller
:允许您配置消息轮询器(如果此端点是 . 自选。PollingConsumer
定义参数源
参数源控制检索 Spring 集成消息属性并将其映射到相关存储过程输入参数的技术。
存储过程组件遵循某些规则。
默认情况下,有效负载的 Bean 属性用作存储过程的 Importing 参数的源。
在这种情况下,使用 a。
对于基本用例,这可能就足够了。
下一个示例说明了该默认行为。Message
BeanPropertySqlParameterSourceFactory
要使用 to 工作对 bean 属性进行“自动”查找,必须以小写形式定义 bean 属性。
这是因为 in (Java 方法是 ),检索到的存储过程参数声明将转换为小写。
因此,如果您具有驼峰式大小写的 bean 属性(例如 ),则查找将失败。
在这种情况下,请提供显式 .BeanPropertySqlParameterSourceFactory org.springframework.jdbc.core.metadata.CallMetaDataContext matchInParameterValuesWithCallParameters() lastName ProcedureParameter |
假设我们有一个有效负载,它由一个具有以下三个属性的简单 bean 组成:、 和 。
此外,我们有一个简单的 Stored Procedure,它接受三个输入参数:、 和 。
我们还使用完全支持的数据库。
在这种情况下,存储过程出站适配器的以下配置就足够了:id
name
description
INSERT_COFFEE
id
name
description
<int-jdbc:stored-proc-outbound-channel-adapter data-source="dataSource"
channel="insertCoffeeProcedureRequestChannel"
stored-procedure-name="INSERT_COFFEE"/>
对于更复杂的选项,请考虑传入一个或多个值。ProcedureParameter
如果您确实显式提供了值,则默认情况下,an 用于参数处理,以启用 SPEL 表达式的全部功能。ProcedureParameter
ExpressionEvaluatingSqlParameterSourceFactory
如果需要对参数的检索方式进行更多控制,请考虑使用 attribute 传入 的自定义实现。SqlParameterSourceFactory
sql-parameter-source-factory
存储过程入站通道适配器
下面的清单列出了对存储过程入站通道适配器很重要的属性:
<int-jdbc:stored-proc-inbound-channel-adapter
channel="" (1)
stored-procedure-name=""
data-source=""
auto-startup="true"
id=""
ignore-column-meta-data="false"
is-function="false"
skip-undeclared-results="" (2)
return-value-required="false" (3)
<int:poller/>
<int-jdbc:sql-parameter-definition name="" direction="IN"
type="STRING"
scale=""/>
<int-jdbc:parameter name="" type="" value=""/>
<int-jdbc:parameter name="" expression=""/>
<int-jdbc:returning-resultset name="" row-mapper="" />
</int-jdbc:stored-proc-inbound-channel-adapter>
1 | 轮询消息发送到的通道。
如果存储过程或函数未返回任何数据,则 的有效负载为 null。
必填。Message |
2 | 如果此属性设置为 ,则绕过存储过程调用中没有相应声明的所有结果。
例如,存储过程可以返回更新计数值,即使存储过程仅声明一个 result 参数。
确切的行为取决于数据库实现。
该值在基础 上设置。
该值默认为 。
自选。true SqlOutParameter JdbcTemplate true |
3 | 指示是否应包含此过程的返回值。 从 Spring Integration 3.0 开始。 自选。 |
存储过程出站通道适配器
下面的清单列出了对存储过程出站通道适配器很重要的属性:
<int-jdbc:stored-proc-outbound-channel-adapter channel="" (1)
stored-procedure-name=""
data-source=""
auto-startup="true"
id=""
ignore-column-meta-data="false"
order="" (2)
sql-parameter-source-factory=""
use-payload-as-parameter-source="">
<int:poller fixed-rate=""/>
<int-jdbc:sql-parameter-definition name=""/>
<int-jdbc:parameter name=""/>
</int-jdbc:stored-proc-outbound-channel-adapter>
1 | 此终端节点的接收消息通道。 必填。 |
2 | 指定此终端节点作为订阅者连接到通道时的调用顺序。
当该通道使用 dispatching 策略时,这一点尤其重要。
当此终端节点本身是具有队列的通道的轮询使用者时,它不起作用。
自选。failover |
存储过程出站网关
下面的清单列出了对存储过程出站通道适配器很重要的属性:
<int-jdbc:stored-proc-outbound-gateway request-channel="" (1)
stored-procedure-name=""
data-source=""
auto-startup="true"
id=""
ignore-column-meta-data="false"
is-function="false"
order=""
reply-channel="" (2)
reply-timeout="" (3)
return-value-required="false" (4)
skip-undeclared-results="" (5)
sql-parameter-source-factory=""
use-payload-as-parameter-source="">
<int-jdbc:sql-parameter-definition name="" direction="IN"
type=""
scale="10"/>
<int-jdbc:sql-parameter-definition name=""/>
<int-jdbc:parameter name="" type="" value=""/>
<int-jdbc:parameter name="" expression=""/>
<int-jdbc:returning-resultset name="" row-mapper="" />
1 | 此终端节点的接收消息通道。 必填。 |
2 | 收到数据库响应后应将回复发送到的消息通道。 自选。 |
3 | 允许您指定此网关在引发异常之前等待成功发送回复消息的时间。
请记住,当发送到 时,调用发生在发送方的线程中。
因此,send 操作的失败可能是由下游的其他组件引起的。
默认情况下,网关无限期等待。
该值以毫秒为单位指定。
自选。DirectChannel |
4 | 指示是否应包含此过程的返回值。 自选。 |
5 | 如果该属性设置为 ,则绕过存储过程调用中没有相应声明的所有结果。
例如,存储过程可能会返回更新计数值,即使存储过程只声明了一个 result 参数。
确切的行为取决于数据库。
该值在基础 上设置。
该值默认为 。
自选。skip-undeclared-results true SqlOutParameter JdbcTemplate true |
例子
本节包含两个调用 Apache Derby 存储过程的示例。
第一个过程调用返回 .
通过使用 ,数据被转换为域对象,然后成为 Spring 集成消息有效负载。ResultSet
RowMapper
在第二个示例中,我们调用了一个存储过程,该过程使用输出参数来返回数据。
该项目包含此处引用的 Apache Derby 示例,以及有关如何运行它的说明。 Spring Integration Samples 项目还提供了一个使用 Oracle 存储过程的示例。 |
在第一个示例中,我们调用一个名为 的存储过程,该过程未定义任何输入参数,但返回一个 .FIND_ALL_COFFEE_BEVERAGES
ResultSet
在 Apache Derby 中,存储过程是用 Java 实现的。 下面的清单显示了方法签名:
public static void findAllCoffeeBeverages(ResultSet[] coffeeBeverages)
throws SQLException {
...
}
下面的清单显示了相应的 SQL:
CREATE PROCEDURE FIND_ALL_COFFEE_BEVERAGES() \
PARAMETER STYLE JAVA LANGUAGE JAVA MODIFIES SQL DATA DYNAMIC RESULT SETS 1 \
EXTERNAL NAME 'o.s.i.jdbc.storedproc.derby.DerbyStoredProcedures.findAllCoffeeBeverages';
在 Spring 集成中,您现在可以使用例如a来调用此存储过程,如下例所示:stored-proc-outbound-gateway
<int-jdbc:stored-proc-outbound-gateway id="outbound-gateway-storedproc-find-all"
data-source="dataSource"
request-channel="findAllProcedureRequestChannel"
expect-single-result="true"
stored-procedure-name="FIND_ALL_COFFEE_BEVERAGES">
<int-jdbc:returning-resultset name="coffeeBeverages"
row-mapper="org.springframework.integration.support.CoffeBeverageMapper"/>
</int-jdbc:stored-proc-outbound-gateway>
在第二个示例中,我们调用了一个名为 的存储过程,该存储过程具有一个输入参数。
它使用 output 参数,而不是返回 。
以下示例显示了方法签名:FIND_COFFEE
ResultSet
public static void findCoffee(int coffeeId, String[] coffeeDescription)
throws SQLException {
...
}
下面的清单显示了相应的 SQL:
CREATE PROCEDURE FIND_COFFEE(IN ID INTEGER, OUT COFFEE_DESCRIPTION VARCHAR(200)) \
PARAMETER STYLE JAVA LANGUAGE JAVA EXTERNAL NAME \
'org.springframework.integration.jdbc.storedproc.derby.DerbyStoredProcedures.findCoffee';
在 Spring 集成中,您现在可以通过使用例如a来调用此存储过程,如下例所示:stored-proc-outbound-gateway
<int-jdbc:stored-proc-outbound-gateway id="outbound-gateway-storedproc-find-coffee"
data-source="dataSource"
request-channel="findCoffeeProcedureRequestChannel"
skip-undeclared-results="true"
stored-procedure-name="FIND_COFFEE"
expect-single-result="true">
<int-jdbc:parameter name="ID" expression="payload" />
</int-jdbc:stored-proc-outbound-gateway>
JDBC Lock Registry
版本 4.3 引入了 .
某些组件(例如,aggregator 和 resequencer)使用从实例获取的锁来确保一次只有一个线程操作一个组。
在单个组件中执行此功能。
现在,您可以在这些组件上配置外部锁注册表。
与 共享 一起使用时,您可以使用 跨多个应用程序实例提供此功能,以便一次只有一个实例可以操作组。JdbcLockRegistry
LockRegistry
DefaultLockRegistry
MessageGroupStore
JdbcLockRegistry
当本地线程释放锁时,另一个本地线程通常可以立即获取该锁。 如果锁由使用其他注册表实例的线程释放,则获取锁可能需要长达 100 毫秒的时间。
它基于抽象,它有一个实现。
数据库架构脚本位于包中,该包针对特定的 RDBMS 供应商进行了划分。
例如,下面的清单显示了 lock table 的 H2 DDL:JdbcLockRegistry
LockRepository
DefaultLockRepository
org.springframework.integration.jdbc
CREATE TABLE INT_LOCK (
LOCK_KEY CHAR(36),
REGION VARCHAR(100),
CLIENT_ID CHAR(36),
CREATED_DATE TIMESTAMP NOT NULL,
constraint INT_LOCK_PK primary key (LOCK_KEY, REGION)
);
可以根据目标数据库设计要求进行更改。
因此,必须在 Bean 定义上使用 property。INT_
prefix
DefaultLockRepository
有时,一个应用程序已移动到无法释放分布式锁并删除数据库中的特定记录的状态。
为此,此类死锁可以在下一次锁定调用时由其他应用程序过期。
上的 (TTL) 选项就是为此目的而提供的。
您可能还希望为为给定实例存储的锁指定。
如果是这样,则可以指定 to 与 as a constructor 参数相关联。timeToLive
DefaultLockRepository
CLIENT_ID
DefaultLockRepository
id
DefaultLockRepository
从版本 5.1.8 开始,可以使用 - a 配置为在锁定记录插入/更新执行之间休眠。
默认情况下,它是毫秒,在某些环境中,非领导者会过于频繁地污染与数据源的连接。JdbcLockRegistry
idleBetweenTries
Duration
100
从版本 5.4 开始,该接口已被引入并添加到 中。
如果锁定的进程比锁的生存时间长,则必须在锁定的进程期间调用该方法。
因此,生存时间可以大大缩短,部署可以快速重新获得丢失的锁。RenewableLockRegistry
JdbcLockRegistry
renewLock()
仅当锁由当前线程持有时,才能进行锁续订。 |
String 的 5.5.6 版本,支持通过 .
有关更多信息,请参阅其 JavaDocs。JdbcLockRegistry
JdbcLockRegistry.locks
JdbcLockRegistry.setCacheCapacity()
String 的版本为 6.0,则可以提供 a 而不是依赖于应用程序上下文中的主 bean。DefaultLockRepository
PlatformTransactionManager
JDBC 元数据存储
版本 5.0 引入了 JDBC(请参阅元数据存储)实现。
您可以使用 在应用程序重启后维护元数据状态。
此实现可与适配器一起使用,例如:MetadataStore
JdbcMetadataStore
MetadataStore
要将这些适配器配置为使用 ,请使用 Bean 名称 声明 Spring Bean。
Feed 入站通道适配器和 Feed 入站通道适配器都会自动选取并使用 declared ,如下例所示:JdbcMetadataStore
metadataStore
JdbcMetadataStore
@Bean
public MetadataStore metadataStore(DataSource dataSource) {
return new JdbcMetadataStore(dataSource);
}
该软件包包含多个 RDMBS 供应商的数据库架构脚本。
例如,下面的清单显示了元数据表的 H2 DDL:org.springframework.integration.jdbc
CREATE TABLE INT_METADATA_STORE (
METADATA_KEY VARCHAR(255) NOT NULL,
METADATA_VALUE VARCHAR(4000),
REGION VARCHAR(100) NOT NULL,
constraint INT_METADATA_STORE_PK primary key (METADATA_KEY, REGION)
);
您可以更改前缀以匹配目标数据库设计要求。
您还可以配置为使用自定义前缀。INT_
JdbcMetadataStore
实现 ,使其在多个应用程序实例之间可靠地共享,其中只有一个实例可以存储或修改键的值。
由于交易保证,所有这些操作都是原子的。JdbcMetadataStore
ConcurrentMetadataStore
事务管理必须使用 .
入站通道适配器可以在 poller 配置中提供对 的引用。
与非事务性实现不同,使用 时,该条目仅在事务提交后才会出现在目标表中。
发生回滚时,不会向表中添加任何条目。JdbcMetadataStore
TransactionManager
MetadataStore
JdbcMetadataStore
INT_METADATA_STORE
从版本 5.0.7 开始,您可以使用 RDBMS 供应商特定的选项配置元数据存储条目的基于锁的查询。
默认情况下,如果目标数据库不支持行锁定功能,则它是并且可以配置空字符串。
请咨询您的供应商,了解表达式中有关在更新之前锁定行的特定和可能的提示。JdbcMetadataStore
lockHint
FOR UPDATE
SELECT