此版本仍在开发中,尚未被视为稳定版本。最新的稳定版本请使用 Spring Framework 6.1.13! |
此版本仍在开发中,尚未被视为稳定版本。最新的稳定版本请使用 Spring Framework 6.1.13! |
Kotlin 协程就是 Kotlin
轻量级线程允许以命令式方式编写非阻塞代码。在语言方面,
挂起函数为异步操作提供了抽象,而在库端,kotlinx.coroutines 提供了 async { }
等函数和 Flow
等类型。
Spring Framework 在以下范围内为 Coroutines 提供支持:
-
Spring MVC 和 WebFlux 中的暂停函数支持
@Controller
-
WebFlux.fn coRouter { } DSL
-
WebFlux
CoWeb过滤器
-
RSocket 注释方法中的暂停函数和支持
Flow
@MessageMapping
-
RSocketRequester
的扩展 -
Spring AOP
依赖
当 and dependencies 位于 classpath 中时,将启用协程支持:kotlinx-coroutines-core
kotlinx-coroutines-reactor
build.gradle.kts
dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:${coroutinesVersion}")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:${coroutinesVersion}")
}
支持 版本及以上。1.4.0
Reactive 如何转换为 Coroutines?
对于返回值,从 Reactive API 到 Coroutines API 的转换如下:
-
fun handler(): Mono<Void>
成为suspend fun handler()
-
fun handler(): Mono<T>
变为 或 取决于 是否可以为空(具有更静态类型的优势)suspend fun handler(): T
suspend fun handler(): T?
Mono
-
fun handler(): Flux<T>
成为fun handler(): Flow<T>
对于输入参数:
-
如果不需要 laziness,则可以调用 become since 一个挂起的函数来获取 value 参数。
fun handler(mono: Mono<T>)
fun handler(value: T)
-
如果需要 laziness,则变为 或
fun handler(mono: Mono<T>)
fun handler(supplier: suspend () → T)
fun handler(supplier: suspend () → T?)
Flow
在 Coroutines 世界中是等效的,适用于热流或冷流、有限流或无限流,主要区别如下:Flux
-
Flow
是基于推拉的,而 是推拉混合的Flux
-
背压通过挂起功能实现
-
Flow
只有一个 suspendingcollect
方法,并且运算符作为扩展实现 -
借助协程,运算符易于实现
-
扩展允许向
Flow
-
收集操作正在挂起函数
-
map
运算符支持异步操作(不需要),因为它需要一个 suspending 函数参数flatMap
有关更多详细信息,包括如何与协程并发运行代码,请参阅这篇关于使用 Spring、Coroutines 和 Kotlin Flow 实现反应式的博文。
控制器
下面是一个 Coroutines 示例。@RestController
@RestController
class CoroutinesRestController(client: WebClient, banner: Banner) {
@GetMapping("/suspend")
suspend fun suspendingEndpoint(): Banner {
delay(10)
return banner
}
@GetMapping("/flow")
fun flowEndpoint() = flow {
delay(10)
emit(banner)
delay(10)
emit(banner)
}
@GetMapping("/deferred")
fun deferredEndpoint() = GlobalScope.async {
delay(10)
banner
}
@GetMapping("/sequential")
suspend fun sequential(): List<Banner> {
val banner1 = client
.get()
.uri("/suspend")
.accept(MediaType.APPLICATION_JSON)
.awaitExchange()
.awaitBody<Banner>()
val banner2 = client
.get()
.uri("/suspend")
.accept(MediaType.APPLICATION_JSON)
.awaitExchange()
.awaitBody<Banner>()
return listOf(banner1, banner2)
}
@GetMapping("/parallel")
suspend fun parallel(): List<Banner> = coroutineScope {
val deferredBanner1: Deferred<Banner> = async {
client
.get()
.uri("/suspend")
.accept(MediaType.APPLICATION_JSON)
.awaitExchange()
.awaitBody<Banner>()
}
val deferredBanner2: Deferred<Banner> = async {
client
.get()
.uri("/suspend")
.accept(MediaType.APPLICATION_JSON)
.awaitExchange()
.awaitBody<Banner>()
}
listOf(deferredBanner1.await(), deferredBanner2.await())
}
@GetMapping("/error")
suspend fun error() {
throw IllegalStateException()
}
@GetMapping("/cancel")
suspend fun cancel() {
throw CancellationException()
}
}
还支持使用 进行视图渲染。@Controller
@Controller
class CoroutinesViewController(banner: Banner) {
@GetMapping("/")
suspend fun render(model: Model): String {
delay(10)
model["banner"] = banner
return "index"
}
}
WebFlux.fn 网站
以下是通过 coRouter { } DSL 和相关处理程序定义的协程路由器示例。
@Configuration
class RouterConfiguration {
@Bean
fun mainRouter(userHandler: UserHandler) = coRouter {
GET("/", userHandler::listView)
GET("/api/user", userHandler::listApi)
}
}
class UserHandler(builder: WebClient.Builder) {
private val client = builder.baseUrl("...").build()
suspend fun listView(request: ServerRequest): ServerResponse =
ServerResponse.ok().renderAndAwait("users", mapOf("users" to
client.get().uri("...").awaitExchange().awaitBody<User>()))
suspend fun listApi(request: ServerRequest): ServerResponse =
ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).bodyAndAwait(
client.get().uri("...").awaitExchange().awaitBody<User>())
}
交易
协程上的事务通过 Reactive 的编程变体支持 事务管理。
对于挂起功能,提供了扩展。TransactionalOperator.executeAndAwait
import org.springframework.transaction.reactive.executeAndAwait
class PersonRepository(private val operator: TransactionalOperator) {
suspend fun initDatabase() = operator.executeAndAwait {
insertPerson1()
insertPerson2()
}
private suspend fun insertPerson1() {
// INSERT SQL statement
}
private suspend fun insertPerson2() {
// INSERT SQL statement
}
}
对于 Kotlin ,提供了一个扩展。Flow
Flow<T>.transactional
import org.springframework.transaction.reactive.transactional
class PersonRepository(private val operator: TransactionalOperator) {
fun updatePeople() = findPeople().map(::updatePerson).transactional(operator)
private fun findPeople(): Flow<Person> {
// SELECT SQL statement
}
private suspend fun updatePerson(person: Person): Person {
// UPDATE SQL statement
}
}