R2DBC(“反应式关系数据库连接”)是一个社区驱动的 使用反应式模式标准化对 SQL 数据库的访问的规范工作。

包层次结构

Spring Framework 的 R2DBC 抽象框架由两个不同的包组成:

使用 R2DBC 核心类控制基本的 R2DBC 处理和错误处理

本节介绍如何使用 R2DBC 核心类来控制基本的 R2DBC 处理。 包括错误处理。它包括以下主题:

DatabaseClient

DatabaseClient是 R2DBC 核心包中的中心类。它处理 创建和发布资源,这有助于避免常见错误,例如 忘记关闭连接。它执行核心 R2DBC 的基本任务 工作流(如语句创建和执行),将应用程序代码留给提供 SQL 并提取结果。班级:DatabaseClient

  • 运行 SQL 查询

  • 更新语句和存储过程调用

  • 对实例进行迭代Result

  • 捕获 R2DBC 异常,并将它们转换为通用的、信息量更大的 包中定义的异常层次结构。 (请参阅一致的异常层次结构org.springframework.dao

客户端具有一个功能齐全的流畅 API,使用响应式类型进行声明性组合。

当你在代码中使用 时,你只需要实现接口,给它们一个明确定义的协定。 给定类提供的 a,回调将创建一个 .映射函数也是如此 提取结果。DatabaseClientjava.util.functionConnectionDatabaseClientFunctionPublisherRow

您可以通过直接实例化在 DAO 实现中使用 使用引用,也可以在 Spring IoC 容器中配置它 并将其作为 Bean 参考提供给 DAO。DatabaseClientConnectionFactory

创建对象的最简单方法是通过静态工厂方法,如下所示:DatabaseClient

  • Java

  • Kotlin

DatabaseClient client = DatabaseClient.create(connectionFactory);
val client = DatabaseClient.create(connectionFactory)
应始终在 Spring IoC 中配置为 Bean 容器。ConnectionFactory

前面的方法创建一个具有默认设置的设置。DatabaseClient

您也可以从 中获取实例。 可以通过调用以下方法自定义客户端:BuilderDatabaseClient.builder()

  • ….bindMarkers(…):提供特定配置的命名 参数到数据库绑定标记的转换。BindMarkersFactory

  • ….executeFunction(…):设置对象获取方式 跑。ExecuteFunctionStatement

  • ….namedParameters(false):禁用命名参数扩展。默认启用。

方言由 BindMarkersFactoryResolver 从 解析,通常通过检查 .
你可以让 Spring 通过注册一个 通过实现的类。 发现绑定标记提供程序实现 使用 Spring 的 .
ConnectionFactoryConnectionFactoryMetadataBindMarkersFactoryorg.springframework.r2dbc.core.binding.BindMarkersFactoryResolver$BindMarkerFactoryProviderMETA-INF/spring.factoriesBindMarkersFactoryResolverSpringFactoriesLoader

目前支持的数据库包括:

  • H2型

  • 玛丽亚数据库

  • Microsoft SQL Server

  • MySQL的

  • Postgres

此类发出的所有 SQL 都记录在类别下的级别 对应于客户端实例的完全限定类名(通常为 )。此外,每个执行都会在 用于辅助调试的反应序列。DEBUGDefaultDatabaseClient

以下各节提供了一些用法示例。这些例子 不是 公开的所有功能的详尽列表。 有关这一点,请参阅随附的 javadocDatabaseClientDatabaseClient

执行语句

DatabaseClient提供运行语句的基本功能。 以下示例显示了需要包含的内容,以实现最小但功能齐全的功能 创建新表的代码:

  • Java

  • Kotlin

Mono<Void> completion = client.sql("CREATE TABLE person (id VARCHAR(255) PRIMARY KEY, name VARCHAR(255), age INTEGER);")
        .then();
client.sql("CREATE TABLE person (id VARCHAR(255) PRIMARY KEY, name VARCHAR(255), age INTEGER);")
        .await()

DatabaseClient专为方便、流畅的使用而设计。 它揭示了每个阶段的中间、延续和终端方法。 执行规范。上面的示例用于返回一个完成,该完成在查询(或查询,如果 SQL 查询包含 多个语句)完成。then()Publisher

execute(…)接受 SQL 查询字符串或查询,以将实际查询创建推迟到执行。Supplier<String>

查询 (SELECT)

SQL 查询可以通过对象或受影响的行数返回值。 可以返回更新的行数或行本身, 取决于发出的查询。RowDatabaseClient

以下查询从表中获取 and 列:idname

  • Java

  • Kotlin

Mono<Map<String, Object>> first = client.sql("SELECT id, name FROM person")
        .fetch().first();
val first = client.sql("SELECT id, name FROM person")
        .fetch().awaitSingle()

以下查询使用 bind 变量:

  • Java

  • Kotlin

Mono<Map<String, Object>> first = client.sql("SELECT id, name FROM person WHERE first_name = :fn")
        .bind("fn", "Joe")
        .fetch().first();
val first = client.sql("SELECT id, name FROM person WHERE first_name = :fn")
        .bind("fn", "Joe")
        .fetch().awaitSingle()

您可能已经注意到上面示例中的用法。 是一个 continuation 运算符,用于指定要使用的数据量。fetch()fetch()

调用返回结果中的第一行并丢弃剩余的行。 您可以使用以下运算符使用数据:first()

  • first()返回整个结果的第一行。其 Kotlin 协程变体 以不可为 null 的返回值命名,如果该值是可选的。awaitSingle()awaitSingleOrNull()

  • one()只返回一个结果,如果结果包含更多行,则返回失败。 使用 Kotlin 协程,正好针对一个值,或者如果该值可能是 。awaitOne()awaitOneOrNull()null

  • all()返回结果的所有行。使用 Kotlin 协程时,请使用 .flow()

  • rowsUpdated()返回受影响的行数 (// 计数)。其 Kotlin 协程变体被命名为 。INSERTUPDATEDELETEawaitRowsUpdated()

如果不指定进一步的映射详细信息,查询将返回表格结果 因为其键是映射到其列值的不区分大小写的列名。Map

您可以通过提供 调用每个值,以便它可以返回任意值(奇异值, 集合和映射,以及对象)。Function<Row, T>Row

下面的示例提取该列并发出其值:name

  • Java

  • Kotlin

Flux<String> names = client.sql("SELECT name FROM person")
        .map(row -> row.get("name", String.class))
        .all();
val names = client.sql("SELECT name FROM person")
        .map{ row: Row -> row.get("name", String.class) }
        .flow()

或者,有一个快捷方式可以映射到单个值:

	Flux<String> names = client.sql("SELECT name FROM person")
			.mapValue(String.class)
			.all();

或者,您可以映射到具有 Bean 属性或记录组件的结果对象:

	// assuming a name property on Person
	Flux<Person> persons = client.sql("SELECT name FROM person")
			.mapProperties(Person.class)
			.all();
怎么样?null

关系数据库结果可以包含值。 反应式流规范禁止发射值。 该要求要求在提取器功能中进行正确处理。 虽然可以从 中获取值,但不得发出值。必须包装对象中的任何值(例如,对于奇异值),以确保永远不会直接返回值 通过您的提取器功能。nullnullnullnullRownullnullOptionalnull

将 (, 和 ) 更新为INSERTUPDATEDELETEDatabaseClient

修改语句的唯一区别是,这些语句通常 不要返回用于使用结果的表格数据。rowsUpdated()

下面的示例演示返回数字的语句 更新的行数:UPDATE

  • Java

  • Kotlin

Mono<Integer> affectedRows = client.sql("UPDATE person SET first_name = :fn")
        .bind("fn", "Joe")
        .fetch().rowsUpdated();
val affectedRows = client.sql("UPDATE person SET first_name = :fn")
        .bind("fn", "Joe")
        .fetch().awaitRowsUpdated()

将值绑定到查询

典型的应用程序需要参数化的 SQL 语句来选择或 根据某些输入更新行。这些是典型的语句 受子句约束或接受的语句 输入参数。参数化语句存在以下情况,即存在SQL注入风险 参数未正确转义。 利用 R2DBC 的 API 来消除查询参数的 SQL 注入风险。 您可以使用运算符提供参数化的 SQL 语句 并将参数绑定到实际的 .然后运行 R2DBC 驱动程序 使用预处理语句和参数替换的语句。SELECTWHEREINSERTUPDATEDatabaseClientbindexecute(…)Statement

参数绑定支持两种绑定策略:

  • 按索引,使用从零开始的参数索引。

  • 按名称,使用占位符名称。

以下示例演示查询的参数绑定:

    db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
	    	.bind("id", "joe")
	    	.bind("name", "Joe")
			.bind("age", 34);

或者,您可以传入名称和值的映射:

	Map<String, Object> params = new LinkedHashMap<>();
	params.put("id", "joe");
	params.put("name", "Joe");
	params.put("age", 34);
	db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
			.bindValues(params);

或者,您可以传入一个带有 Bean 属性或记录组件的参数对象:

	// assuming id, name, age properties on Person
	db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
			.bindProperties(new Person("joe", "Joe", 34);
R2DBC 本机绑定标记

R2DBC 使用数据库本机绑定标记,这些标记依赖于实际的数据库供应商。 例如,Postgres 使用索引标记,例如 、 、 。 另一个示例是 SQL Server,它使用以 . 为前缀的命名绑定标记。$1$2$n@

这与JDBC不同,JDBC需要作为绑定标记。 在 JDBC 中,实际驱动程序将绑定标记转换为数据库本机标记 标记作为其语句执行的一部分。??

Spring Framework 的 R2DBC 支持允许您使用本机绑定标记或命名绑定 带有语法的标记。:name

命名参数支持利用实例扩展命名 参数添加到查询执行时的本机绑定标记,这为您提供了 在各种数据库供应商之间具有一定程度的查询可移植性。BindMarkersFactory

查询预处理器将命名参数展开到一系列绑定中 标记,以消除基于参数数创建动态查询的需要。 嵌套对象数组被扩展为允许使用(例如)选择列表。Collection

请考虑以下查询:

SELECT id, name, state FROM table WHERE (name, age) IN (('John', 35), ('Ann', 50))

前面的查询可以参数化并按如下方式运行:

  • Java

  • Kotlin

List<Object[]> tuples = new ArrayList<>();
tuples.add(new Object[] {"John", 35});
tuples.add(new Object[] {"Ann",  50});

client.sql("SELECT id, name, state FROM table WHERE (name, age) IN (:tuples)")
	    .bind("tuples", tuples);
val tuples: MutableList<Array<Any>> = ArrayList()
tuples.add(arrayOf("John", 35))
tuples.add(arrayOf("Ann", 50))

client.sql("SELECT id, name, state FROM table WHERE (name, age) IN (:tuples)")
	    .bind("tuples", tuples)
选择列表的使用取决于供应商。

以下示例显示了使用谓词的更简单变体:IN

  • Java

  • Kotlin

client.sql("SELECT id, name, state FROM table WHERE age IN (:ages)")
	    .bind("ages", Arrays.asList(35, 50));
client.sql("SELECT id, name, state FROM table WHERE age IN (:ages)")
	    .bind("ages", arrayOf(35, 50))
R2DBC 本身不支持类似 Collection 的值。不过 在上面的示例中扩展给定适用于命名参数 在 Spring 的 R2DBC 支持中,例如用于如上所示的子句。 但是,插入或更新数组类型的列(例如在 Postgres 中) 需要基础 R2DBC 驱动程序支持的数组类型: 通常是一个 Java 数组,例如 以更新列。 不要将等作为数组参数传递。ListINString[]text[]Collection<String>

语句筛选器

有时,您需要在实际运行之前对实际选项进行微调。为此,请注册过滤器 () 与 to intercept 和 修改语句的执行,如以下示例所示:StatementStatementStatementFilterFunctionDatabaseClient

  • Java

  • Kotlin

client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
	    .filter((s, next) -> next.execute(s.returnGeneratedValues("id")))
	    .bind("name", …)
	    .bind("state", …);
client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
		.filter { s: Statement, next: ExecuteFunction -> next.execute(s.returnGeneratedValues("id")) }
		.bind("name", …)
		.bind("state", …)

DatabaseClient还公开了一个简化的重载,该重载接受 一个:filter(…)Function<Statement, Statement>

  • Java

  • Kotlin

client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
	    .filter(statement -> s.returnGeneratedValues("id"));

client.sql("SELECT id, name, state FROM table")
	    .filter(statement -> s.fetchSize(25));
client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
	    .filter { statement -> s.returnGeneratedValues("id") }

client.sql("SELECT id, name, state FROM table")
	    .filter { statement -> s.fetchSize(25) }

StatementFilterFunction实现允许对对象进行筛选和筛选。StatementResult

DatabaseClient最佳实践

一旦配置,该类的实例是线程安全的。这是 这很重要,因为这意味着您可以配置单个实例,然后安全地将此共享引用注入多个 DAO(或存储库)。 是有状态的,因为它保持对 , 但这种状态不是会话状态。DatabaseClientDatabaseClientDatabaseClientConnectionFactory

使用该类时的常见做法是在 Spring 配置文件中配置一个,然后进行依赖注入 将 Bean 共享到您的 DAO 类中。创建于 . 的 setter这导致了类似于以下内容的 DAO:DatabaseClientConnectionFactoryConnectionFactoryDatabaseClientConnectionFactory

  • Java

  • Kotlin

public class R2dbcCorporateEventDao implements CorporateEventDao {

	private DatabaseClient databaseClient;

	public void setConnectionFactory(ConnectionFactory connectionFactory) {
		this.databaseClient = DatabaseClient.create(connectionFactory);
	}

	// R2DBC-backed implementations of the methods on the CorporateEventDao follow...
}
class R2dbcCorporateEventDao(connectionFactory: ConnectionFactory) : CorporateEventDao {

	private val databaseClient = DatabaseClient.create(connectionFactory)

	// R2DBC-backed implementations of the methods on the CorporateEventDao follow...
}

显式配置的替代方法是使用组件扫描和注释 支持依赖注入。在这种情况下,您可以对类进行注释(这使它成为组件扫描的候选对象)并注释 setter 方法。以下示例演示如何执行此操作:@ComponentConnectionFactory@Autowired

  • Java

  • Kotlin

@Component (1)
public class R2dbcCorporateEventDao implements CorporateEventDao {

	private DatabaseClient databaseClient;

	@Autowired (2)
	public void setConnectionFactory(ConnectionFactory connectionFactory) {
		this.databaseClient = DatabaseClient.create(connectionFactory); (3)
	}

	// R2DBC-backed implementations of the methods on the CorporateEventDao follow...
}
1 用 批注类。@Component
2 用 注释 setter 方法。ConnectionFactory@Autowired
3 使用 .DatabaseClientConnectionFactory
@Component (1)
class R2dbcCorporateEventDao(connectionFactory: ConnectionFactory) : CorporateEventDao { (2)

	private val databaseClient = DatabaseClient(connectionFactory) (3)

	// R2DBC-backed implementations of the methods on the CorporateEventDao follow...
}
1 用 批注类。@Component
2 构造函数注入 .ConnectionFactory
3 使用 .DatabaseClientConnectionFactory

无论您选择使用上述哪种模板初始化样式(或 不是),很少需要为每个类创建一个新实例 要运行 SQL 的时间。配置后,实例是线程安全的。 如果应用程序访问多个 数据库,您可能需要多个实例,这需要多个实例,随后需要多个不同配置的实例。DatabaseClientDatabaseClientDatabaseClientConnectionFactoryDatabaseClient

检索自动生成的密钥

INSERT在表中插入行时,语句可能会生成键 它定义了自动增量或标识列。要完全控制 要生成的列名,只需注册一个 请求为所需列生成的键。StatementFilterFunction

  • Java

  • Kotlin

Mono<Integer> generatedId = client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
		.filter(statement -> s.returnGeneratedValues("id"))
		.map(row -> row.get("id", Integer.class))
		.first();

// generatedId emits the generated key once the INSERT statement has finished
val generatedId = client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
		.filter { statement -> s.returnGeneratedValues("id") }
		.map { row -> row.get("id", Integer.class) }
		.awaitOne()

// generatedId emits the generated key once the INSERT statement has finished

控制数据库连接

本节涵盖:

ConnectionFactory

Spring 通过 . A 是 R2DBC 规范的一部分,是一个常见的入口点 对于司机。它允许容器或框架隐藏连接池 以及应用程序代码中的事务管理问题。作为开发人员, 您无需了解有关如何连接到数据库的详细信息。那就是 设置 .你 在开发和测试代码时,很可能会同时担任这两个角色,但事实并非如此 必须知道生产数据源是如何配置的。ConnectionFactoryConnectionFactoryConnectionFactory

当您使用 Spring 的 R2DBC 层时,您可以使用 由第三方提供的连接池实现。一个受欢迎的 实现是 R2DBC 池 ()。春季实施 分发仅用于测试目的,不提供池化。r2dbc-pool

要配置:ConnectionFactory

  1. 获取连接,就像您通常获取 R2DBC 一样。ConnectionFactoryConnectionFactory

  2. 提供 R2DBC URL (有关正确的值,请参阅驱动程序的文档)。

以下示例演示如何配置:ConnectionFactory

  • Java

  • Kotlin

ConnectionFactory factory = ConnectionFactories.get("r2dbc:h2:mem:///test?options=DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE");
val factory = ConnectionFactories.get("r2dbc:h2:mem:///test?options=DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE");

ConnectionFactoryUtils

该类是一个方便而强大的辅助类 它提供了从中获取连接和关闭连接(如有必要)的方法。ConnectionFactoryUtilsstaticConnectionFactory

例如,它支持订阅者绑定连接。ContextR2dbcTransactionManager

SingleConnectionFactory

该类是接口的实现,用于包装每次使用后未关闭的单个接口。SingleConnectionFactoryDelegatingConnectionFactoryConnection

如果任何客户端代码在假定池连接时调用(如使用 持久性工具),则应将该属性设置为 。此设置 返回包装物理连接的 Close-suppressing 代理。请注意,您可以 不再将其转换为本机对象或类似对象。closesuppressClosetrueConnection

SingleConnectionFactory主要是一个测试类,可用于特定要求 例如,如果您的 R2DBC 驱动程序允许此类使用,则使用流水线。 与池化 相比,它始终重用相同的连接,从而避免 过度创建物理连接。ConnectionFactory

TransactionAwareConnectionFactoryProxy

TransactionAwareConnectionFactoryProxy是目标的代理。 代理包装该目标以增加对 Spring 管理事务的感知。ConnectionFactoryConnectionFactory

如果使用未集成的 R2DBC 客户端,则需要使用此类 支持 Spring 的 R2DBC。在这种情况下,您仍然可以使用此客户端,并且在 同时,让这个客户端参与 Spring 管理事务。它通常是 最好将 R2DBC 客户端与适当的资源管理权限集成在一起。ConnectionFactoryUtils

有关更多详细信息,请参阅 TransactionAwareConnectionFactoryProxy javadoc。

R2dbcTransactionManager

该类是 单个 R2DBC .它将一个R2DBC从指定的绑定绑定到订阅者,可能允许每个订阅者有一个订阅者。R2dbcTransactionManagerReactiveTransactionManagerConnectionFactoryConnectionConnectionFactoryContextConnectionConnectionFactory

需要应用程序代码才能通过 检索 R2DBC,而不是 R2DBC 的标准。所有框架类(例如 )都使用这个 策略隐含。如果不与事务管理器一起使用,则查找策略的行为如下 完全一样,因此可以在任何情况下使用。ConnectionConnectionFactoryUtils.getConnection(ConnectionFactory)ConnectionFactory.create()DatabaseClientConnectionFactory.create()