This version is still in development and is not considered stable yet. For the latest stable version, please use Spring Integration 6.3.4!

This version is still in development and is not considered stable yet. For the latest stable version, please use Spring Integration 6.3.4!

The Framework also has been improved to support Kotlin lambdas for functions, so now you can use a combination of the Kotlin language and Spring Integration flow

@Transformer(inputChannel = "functionServiceChannel")
fun kotlinFunction(): (String) -> String {
    return { it.toUpperCase() }

@ServiceActivator(inputChannel = "messageConsumerServiceChannel")
fun kotlinConsumer(): (Message<Any>) -> Unit {
    return { print(it) }

@InboundChannelAdapter(value = "counterChannel",
        poller = Poller(fixedRate = "10", maxMessagesPerPoll = "1"))
fun kotlinSupplier(): () -> String {
    return { "baz" }

Kotlin Coroutines

Starting with version 6.0, Spring Integration provides support for Kotlin Coroutines. Now suspend functions and kotlinx.coroutines.Deferred & kotlinx.coroutines.flow.Flow return types can be used for service

@ServiceActivator(inputChannel = "suspendServiceChannel", outputChannel = "resultChannel")
suspend fun suspendServiceFunction(payload: String) = payload.uppercase()

@ServiceActivator(inputChannel = "flowServiceChannel", outputChannel = "resultChannel", async = "true")
fun flowServiceFunction(payload: String) =
    flow {
        for (i in 1..3) {
            emit("$payload #$i")

The framework treats them as Reactive Streams interactions and uses ReactiveAdapterRegistry to convert to respective Mono and Flux reactor types. Such a function reply is processed then in the reply channel, if it is a ReactiveStreamsSubscribableChannel, or as a result of CompletableFuture in the respective

The functions with Flow result are not async by default on the @ServiceActivator, so Flow instance is produced as a reply message payload. It is the target application’s responsibility to process this object as a coroutine or convert it to Flux, respectively.

The @MessagingGateway interface methods also can be marked with a suspend modifier when declared in Kotlin. The framework utilizes a Mono internally to perform request-reply using the downstream flow. Such a Mono result is processed by the MonoKt.awaitSingleOrNull() API internally to fulfil a kotlin.coroutines.Continuation argument fo the called suspend function of the

@MessagingGateway(defaultRequestChannel = "suspendRequestChannel")
interface SuspendFunGateway {

    suspend fun suspendGateway(payload: String): String


This method has to be called as a coroutine according to Kotlin language

private lateinit var suspendFunGateway: SuspendFunGateway

fun someServiceMethod() {
    runBlocking {
        val reply = suspendFunGateway.suspendGateway("test suspend gateway")
The functions with Flow result are not async by default on the @ServiceActivator, so Flow instance is produced as a reply message payload. It is the target application’s responsibility to process this object as a coroutine or convert it to Flux, respectively.