此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.4.3spring-doc.cadn.net.cn

Kotlin 支持

该框架也得到了改进,以支持函数的 Kotlin lambda,因此现在您可以结合使用 Kotlin 语言和 Spring 集成流定义:spring-doc.cadn.net.cn

@Bean
@Transformer(inputChannel = "functionServiceChannel")
fun kotlinFunction(): (String) -> String {
    return { it.toUpperCase() }
}

@Bean
@ServiceActivator(inputChannel = "messageConsumerServiceChannel")
fun kotlinConsumer(): (Message<Any>) -> Unit {
    return { print(it) }
}

@Bean
@InboundChannelAdapter(value = "counterChannel",
        poller = Poller(fixedRate = "10", maxMessagesPerPoll = "1"))
fun kotlinSupplier(): () -> String {
    return { "baz" }
}

Kotlin 协程

从版本 6.0 开始, Spring 集成提供了对 Kotlin 协程的支持。 现在suspendfunctions 和kotlinx.coroutines.Deferred & kotlinx.coroutines.flow.Flow返回类型可用于服务方法:spring-doc.cadn.net.cn

@ServiceActivator(inputChannel = "suspendServiceChannel", outputChannel = "resultChannel")
suspend fun suspendServiceFunction(payload: String) = payload.uppercase()

@ServiceActivator(inputChannel = "flowServiceChannel", outputChannel = "resultChannel", async = "true")
fun flowServiceFunction(payload: String) =
    flow {
        for (i in 1..3) {
            emit("$payload #$i")
        }
    }

该框架将它们视为 Reactive Streams 交互,并使用ReactiveAdapterRegistry转换为相应的MonoFlux反应器类型。 这样的函数 reply 将在 reply channel 中处理,如果它是一个ReactiveStreamsSubscribableChannel或作为CompletableFuture在相应的回调中。spring-doc.cadn.net.cn

具有Flowresult 不是async默认情况下,在@ServiceActivator所以Flow实例作为回复消息有效负载生成。 目标应用程序负责将此对象作为协程处理或将其转换为Flux分别。

@MessagingGatewayinterface 方法也可以用suspend修饰符。 该框架利用Monointernally 使用下游流执行请求-回复。 这样的Monoresult 由MonoKt.awaitSingleOrNull()API 来实现kotlin.coroutines.Continuationargument fo the calledsuspend网关的功能:spring-doc.cadn.net.cn

@MessagingGateway(defaultRequestChannel = "suspendRequestChannel")
interface SuspendFunGateway {

    suspend fun suspendGateway(payload: String): String

}

根据 Kotlin 语言要求,此方法必须作为协程调用:spring-doc.cadn.net.cn

@Autowired
private lateinit var suspendFunGateway: SuspendFunGateway

fun someServiceMethod() {
    runBlocking {
        val reply = suspendFunGateway.suspendGateway("test suspend gateway")
    }
}