Home > Enterprise >  Kotlin Flows/Channel Concurrency Processing
Kotlin Flows/Channel Concurrency Processing

Time:07-14

Since coroutine flows are sequential performing long running operations in the collector/consumer will cause the execution time to be the sum of the long running operations. To overcome this behavior and execute the long running operation concurrently in the collector/consumer I was wondering if the following code is conformed with coroutines and structured concurrency and also bug free, since it help me achieve the required behavior:

private suspend fun listenForResponses(
        channel: Channel<MyObject>,
        longRunningOperation: suspend (data: MyObject) -> Unit
    ) = withContext(currentCoroutineContext()) {
        channel.consumeEach { resultData ->
            Timber.i("onResponse: data: $resultData")
            launch {
                Timber.i("handle response")
                longRunningOperation(resultData)
                Timber.i("finished handling response")
            }
        }
    }

or for Flows:

private suspend fun listenForResponses(
        flow: Flow<MyObject>,
        longRunningOperation: suspend (data: MyObject) -> Unit
    ) = withContext(currentCoroutineContext()) {
        flow.onEach { resultData ->
            Timber.i("onResponse: data: $resultData")
            launch {
                Timber.i("handle response")
                longRunningOperation(resultData)
                Timber.i("finished handling response")
            }
        }.collect()
    }

CodePudding user response:

Yes, your code is structured correctly and will treat all errors from the long running operations. Bear in mind that an error that originates from a long running operation will:

  • cancel the other long running operations that are running concurrently
  • cancel the channel and its consumption OR cancel the flow collection
  • prevent other operations from starting

If this is not the intended behaviour, I suggest using supervisorScope.

I would also recommend some improvements for readability and efficiency:

private suspend inline fun listenForResponses(
    channel: ReceiveChannel<MyObject>,
    crossinline longRunningOperation: (data: MyObject) -> Unit
) = coroutineScope {
    channel.consumeEach { resultData ->
        Timber.i("onResponse: data: $resultData")
        launch {
            Timber.i("handle response")
            longRunningOperation(resultData)
            Timber.i("finished handling response")
        }
    }
}
  • Related