Since coroutine flows are sequential performing long running operations in the collector/consumer will cause the execution time to be the sum of the long running operations. To overcome this behavior and execute the long running operation concurrently in the collector/consumer I was wondering if the following code is conformed with coroutines and structured concurrency and also bug free, since it help me achieve the required behavior:
private suspend fun listenForResponses(
channel: Channel<MyObject>,
longRunningOperation: suspend (data: MyObject) -> Unit
) = withContext(currentCoroutineContext()) {
channel.consumeEach { resultData ->
Timber.i("onResponse: data: $resultData")
launch {
Timber.i("handle response")
longRunningOperation(resultData)
Timber.i("finished handling response")
}
}
}
or for Flows:
private suspend fun listenForResponses(
flow: Flow<MyObject>,
longRunningOperation: suspend (data: MyObject) -> Unit
) = withContext(currentCoroutineContext()) {
flow.onEach { resultData ->
Timber.i("onResponse: data: $resultData")
launch {
Timber.i("handle response")
longRunningOperation(resultData)
Timber.i("finished handling response")
}
}.collect()
}
CodePudding user response:
Yes, your code is structured correctly and will treat all errors from the long running operations. Bear in mind that an error that originates from a long running operation will:
- cancel the other long running operations that are running concurrently
- cancel the channel and its consumption OR cancel the flow collection
- prevent other operations from starting
If this is not the intended behaviour, I suggest using supervisorScope
.
I would also recommend some improvements for readability and efficiency:
private suspend inline fun listenForResponses(
channel: ReceiveChannel<MyObject>,
crossinline longRunningOperation: (data: MyObject) -> Unit
) = coroutineScope {
channel.consumeEach { resultData ->
Timber.i("onResponse: data: $resultData")
launch {
Timber.i("handle response")
longRunningOperation(resultData)
Timber.i("finished handling response")
}
}
}