Home > OS >  Returning the first successful task with Kotiln coroutines without waiting or cancellation of others
Returning the first successful task with Kotiln coroutines without waiting or cancellation of others

Time:07-22

I have a dispatcher that dispatches async tasks and I want it to immediately return the result of the first successful task without waiting or cancellation of the other active tasks.

From running the example code below, I want the output to be:

Expected

Execute task A
Execute task B
Execution result: Result(success=true, value=B)
Result(success=true, value=B)
Execute task C

Instead of the actual output:

Actual

Execute task A
Execute task B
Execution result: Result(success=true, value=B)
Execute task C
Result(success=true, value=B)
data class Result(val success: Boolean, val value: String)

interface Task {
    suspend fun execute(): Result
}

val taskA = object : Task {
    override suspend fun execute(): Result {
        delay(100)
        println("Execute task A")
        return Result(false, "A")
    }
}

val taskB = object : Task {
    override suspend fun execute(): Result {
        delay(200)
        println("Execute task B")
        return Result(true, "B")
    }
}

val taskC = object : Task {
    override suspend fun execute(): Result {
        delay(300)
        println("Execute task C")
        return Result(true, "C")
    }
}

class Dispatcher {

    suspend fun execute(): Result? = coroutineScope {
        val deferred: List<Deferred<Result>> = listOf(taskA, taskB, taskC).map {
            async { it.execute() }
        }

        // Get the first Result that has success == true
        val result = channelFlow { deferred.forEach { launch { send(it.await()) } } }
            .filter { it.success }
            .firstOrNull()

        println("Execution result: $result")

        /*
        Coroutine scope will wait for all children to complete before returning.
        Children can be cancelled to return result immediately, but I want the 
        children to complete (in the background?)
         */

//         deferred.forEach { it.cancel() }
        result
    }
}

fun main() = runBlocking {
    val result = Dispatcher().execute()
    println(result)
    delay(1000)
}

CodePudding user response:

The coroutineScope function suspends and doesn't return until all of its child coroutines return.

Since you want the launched coroutines to continue running even after the current suspend function returns, you must logically use some other CoroutineScope to launch those coroutines, not a child scope created with the coroutineScope function.

For example:

class Dispatcher {
    private val dispatcherScope = SupervisorScope()   CoroutineName("Dispatcher")

    fun cancelAll() = dispatcherScope.cancel()

    suspend fun execute(): Result? {
        val deferred: List<Deferred<Result>> = listOf(taskA, taskB, taskC).map {
            dispatcherScope.async { it.execute() }
        }

        // Get the first Result that has success == true
        val result = channelFlow { deferred.forEach { launch { send(it.await()) } } }
            .filter { it.success }
            .firstOrNull()

        println("Execution result: $result")

        return result
    }
}
  • Related