Kotlin 协程就是 Kotlin 轻量级线程允许以命令式方式编写非阻塞代码。在语言方面, 挂起函数为异步操作提供了抽象,而在库端,kotlinx.coroutines 提供了 async { } 等函数和 Flow 等类型。spring-doc.cn

Spring Framework 在以下范围内为 Coroutines 提供支持:spring-doc.cn

依赖

当 and dependencies 位于 classpath 中时,将启用协程支持:kotlinx-coroutines-corekotlinx-coroutines-reactorspring-doc.cn

build.gradle.ktsspring-doc.cn

dependencies {

	implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:${coroutinesVersion}")
	implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:${coroutinesVersion}")
}

支持 版本及以上。1.4.0spring-doc.cn

Reactive 如何转换为 Coroutines?

对于返回值,从 Reactive API 到 Coroutines API 的转换如下:spring-doc.cn

  • fun handler(): Mono<Void>成为suspend fun handler()spring-doc.cn

  • fun handler(): Mono<T>变为 或 取决于 是否可以为空(具有更静态类型的优势)suspend fun handler(): Tsuspend fun handler(): T?Monospring-doc.cn

  • fun handler(): Flux<T>成为fun handler(): Flow<T>spring-doc.cn

对于输入参数:spring-doc.cn

  • 如果不需要 laziness,则可以调用 become since 一个挂起的函数来获取 value 参数。fun handler(mono: Mono<T>)fun handler(value: T)spring-doc.cn

  • 如果需要 laziness,则变为 或fun handler(mono: Mono<T>)fun handler(supplier: suspend () → T)fun handler(supplier: suspend () → T?)spring-doc.cn

Flow 在 Coroutines 世界中是等效的,适用于热流或冷流、有限流或无限流,主要区别如下:Fluxspring-doc.cn

有关更多详细信息,包括如何与协程并发运行代码,请参阅这篇关于使用 Spring、Coroutines 和 Kotlin Flow 实现反应式的博文。spring-doc.cn

控制器

下面是一个 Coroutines 示例。@RestControllerspring-doc.cn

@RestController
class CoroutinesRestController(client: WebClient, banner: Banner) {

	@GetMapping("/suspend")
	suspend fun suspendingEndpoint(): Banner {
		delay(10)
		return banner
	}

	@GetMapping("/flow")
	fun flowEndpoint() = flow {
		delay(10)
		emit(banner)
		delay(10)
		emit(banner)
	}

	@GetMapping("/deferred")
	fun deferredEndpoint() = GlobalScope.async {
		delay(10)
		banner
	}

	@GetMapping("/sequential")
	suspend fun sequential(): List<Banner> {
		val banner1 = client
				.get()
				.uri("/suspend")
				.accept(MediaType.APPLICATION_JSON)
				.awaitExchange()
				.awaitBody<Banner>()
		val banner2 = client
				.get()
				.uri("/suspend")
				.accept(MediaType.APPLICATION_JSON)
				.awaitExchange()
				.awaitBody<Banner>()
		return listOf(banner1, banner2)
	}

	@GetMapping("/parallel")
	suspend fun parallel(): List<Banner> = coroutineScope {
		val deferredBanner1: Deferred<Banner> = async {
			client
					.get()
					.uri("/suspend")
					.accept(MediaType.APPLICATION_JSON)
					.awaitExchange()
					.awaitBody<Banner>()
		}
		val deferredBanner2: Deferred<Banner> = async {
			client
					.get()
					.uri("/suspend")
					.accept(MediaType.APPLICATION_JSON)
					.awaitExchange()
					.awaitBody<Banner>()
		}
		listOf(deferredBanner1.await(), deferredBanner2.await())
	}

	@GetMapping("/error")
	suspend fun error() {
		throw IllegalStateException()
	}

	@GetMapping("/cancel")
	suspend fun cancel() {
		throw CancellationException()
	}

}

还支持使用 进行视图渲染。@Controllerspring-doc.cn

@Controller
class CoroutinesViewController(banner: Banner) {

	@GetMapping("/")
	suspend fun render(model: Model): String {
		delay(10)
		model["banner"] = banner
		return "index"
	}
}

WebFlux.fn 网站

以下是通过 coRouter { } DSL 和相关处理程序定义的协程路由器示例。spring-doc.cn

@Configuration
class RouterConfiguration {

	@Bean
	fun mainRouter(userHandler: UserHandler) = coRouter {
		GET("/", userHandler::listView)
		GET("/api/user", userHandler::listApi)
	}
}
class UserHandler(builder: WebClient.Builder) {

	private val client = builder.baseUrl("...").build()

	suspend fun listView(request: ServerRequest): ServerResponse =
			ServerResponse.ok().renderAndAwait("users", mapOf("users" to
			client.get().uri("...").awaitExchange().awaitBody<User>()))

	suspend fun listApi(request: ServerRequest): ServerResponse =
				ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).bodyAndAwait(
				client.get().uri("...").awaitExchange().awaitBody<User>())
}

交易

协程上的事务通过 Reactive 的编程变体支持 从 Spring Framework 5.2 开始提供的事务 Management。spring-doc.cn

对于挂起功能,提供了扩展。TransactionalOperator.executeAndAwaitspring-doc.cn

import org.springframework.transaction.reactive.executeAndAwait

class PersonRepository(private val operator: TransactionalOperator) {

    suspend fun initDatabase() = operator.executeAndAwait {
        insertPerson1()
        insertPerson2()
    }

    private suspend fun insertPerson1() {
        // INSERT SQL statement
    }

    private suspend fun insertPerson2() {
        // INSERT SQL statement
    }
}

对于 Kotlin ,提供了一个扩展。FlowFlow<T>.transactionalspring-doc.cn

import org.springframework.transaction.reactive.transactional

class PersonRepository(private val operator: TransactionalOperator) {

    fun updatePeople() = findPeople().map(::updatePerson).transactional(operator)

    private fun findPeople(): Flow<Person> {
        // SELECT SQL statement
    }

    private suspend fun updatePerson(person: Person): Person {
        // UPDATE SQL statement
    }
}