R2DBC 支持
R2DBC 支持
Spring 集成通过 R2DBC 驱动程序使用对数据库的反应式访问来提供用于接收和发送消息的通道适配器。
您需要将此依赖项包含在您的项目中:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-r2dbc</artifactId>
<version>6.1.9</version>
</dependency>
compile "org.springframework.integration:spring-integration-r2dbc:6.1.9"
R2DBC 入站通道适配器
这是一个基于 的可轮询实现,并根据选项为从数据库获取的数据生成带有 或 作为有效负载的消息。
查询可以是静态提供的,也可以基于在每次调用时评估的 SPEL 表达式。
它作为评估上下文的根对象存在,以允许使用 Fluent API。
默认情况下,此通道适配器将 select 中的记录映射到实例中。
它可以进行自定义,提供一个选项,该选项由 在 下使用 。
这是可选的,用于标记数据库中的已读记录,以便从后续轮询中跳过。
该操作可以附带一个 to bind values into an based on the records in the result.R2dbcMessageSource
MessageSource
R2dbcEntityOperations
Flux
Mono
expectSingleResult
SELECT
receive()
R2dbcMessageSource.SelectCreator
StatementMapper.SelectSpec
LinkedCaseInsensitiveMap
payloadType
EntityRowMapper
this.r2dbcEntityOperations.getConverter()
updateSql
UPDATE
BiFunction<DatabaseClient.GenericExecuteSpec, ?, DatabaseClient.GenericExecuteSpec>
UPDATE
SELECT
此 channel adapter 的典型配置可能如下所示:
@Bean
@InboundChannelAdapter("fromR2dbcChannel")
public R2dbcMessageSource r2dbcMessageSourceSelectMany() {
R2dbcMessageSource r2dbcMessageSource = new R2dbcMessageSource(this.r2dbcEntityTemplate,
"SELECT * FROM person WHERE name='Name'");
r2dbcMessageSource.setPayloadType(Person.class);
r2dbcMessageSource.setUpdateSql("UPDATE Person SET name='SomeOtherName' WHERE id = :id");
r2dbcMessageSource.setBindFunction(
(DatabaseClient.GenericExecuteSpec bindSpec, Person o) -> bindSpec.bind("id", o.getId()));}
return r2dbcMessageSource;
}
使用 Java DSL 时,此通道适配器的配置如下所示:
@Bean
IntegrationFlow r2dbcDslFlow(R2dbcEntityTemplate r2dbcEntityTemplate) {
return IntegrationFlow
.from(R2dbc.inboundChannelAdapter(r2dbcEntityTemplate,
(selectCreator) ->
selectCreator.createSelect("person")
.withProjection("*")
.withCriteria(Criteria.where("id").is(1)))
.expectSingleResult(true)
.payloadType(Person.class)
.updateSql("UPDATE Person SET id='2' where id = :id")
.bindFunction((DatabaseClient.GenericExecuteSpec bindSpec, Person o) ->
bindSpec.bind("id", o.getId())),
e -> e.poller(p -> p.fixedDelay(100)))
.handle((p, h) -> p)
.channel(MessageChannels.flux())
.get();
}
R2DBC 出站通道适配器
这是一种使用提供的 .
可以静态配置,也可以通过针对请求消息的 SPEL 表达式进行配置。
要执行的查询可以基于 , 和 expression 选项,或者(如果未提供)将整个消息有效负载视为要对其执行 SQL 的实体。
该包注册为导入到 SPEL 评估上下文中,以便直接访问用于和查询的 Fluent API。
该用于 和 并且必须计算为 for 列值对,才能根据请求消息在目标表中执行更改。R2dbcMessageHandler
ReactiveMessageHandler
INSERT
UPDATE
DELETE
R2dbcEntityOperations
R2dbcMessageHandler.Type
tableName
values
criteria
tableName
org.springframework.data.relational.core.mapping.Table
org.springframework.data.relational.core.query
Criteria
UPDATE
DELETE
valuesExpression
INSERT
UPDATE
Map
此 channel adapter 的典型配置可能如下所示:
@Bean
@ServiceActivator(inputChannel = "toR2dbcChannel")
public R2dbcMessageHandler r2dbcMessageHandler(R2dbcEntityTemplate r2dbcEntityTemplate) {
R2dbcMessageHandler messageHandler = new R2dbcMessageHandler(r2dbcEntityTemplate)
messageHandler.setValuesExpression(new FunctionExpression<Message<?>>(Message::getPayload));
messageHandler.setQueryType(R2dbcMessageHandler.Type.UPDATE);
messageHandler.setCriteriaExpression(
EXPRESSION_PARSER.parseExpression("T(Criteria).where('id).is(headers.personId)));
return messageHandler;
}
使用 Java DSL 时,此通道适配器的配置如下所示:
.handleReactive(R2dbc.outboundChannelAdapter(r2dbcEntityTemplate)
.queryType(R2dbcMessageHandler.Type.UPDATE)
.tableNameExpression("payload.class.simpleName")
.criteria((message) -> Criteria.where("id").is(message.getHeaders().get("personId")))
.values("{age:36}"))