Home > Software engineering >  How to properly have a queue of pending operations using Kotlin Coroutines?
How to properly have a queue of pending operations using Kotlin Coroutines?

Time:07-27

Background

I've found some classes/functions on a large app I work on, that have calls that shouldn't be on the UI thread (such as accessing the storage or DB).

Such operations could cause ANRs, and indeed I can see a percentage of ANRs on the Play Console.

I'd like to change this, and hopefully by using Kotlin Coroutines to also have a bit more order in code.

So, currently I work on a class that extends BroadcastReceiver and so it needs the onReceive callbacks to be handled one after another on the UI thread, each one will have to "wait" for the previous ones to finish.

Inside the onReceive callback, there are sadly calls that should be done on the background thread, and some on the UI thread. Sometimes there are conditions that have them both.

Meaning for example :

if( someCheckOnUiThread() && someDbOperation()) {
  ...
}

The problem

I'm quite new to Kotlin Coroutines, and even though I've found how to handle this, I'm pretty sure there is a more official way to do it, as I've read some tips and comments about this from others (here).

What I've tried

What I did actually works, but it looks more like a workaround:

private val mainScope = MainScope()
private val backgroundWorkDispatcher: CoroutineDispatcher =
        java.util.concurrent.Executors.newFixedThreadPool(1).asCoroutineDispatcher()

And then use them right in the onReceive callback:

@UiThread
override fun onReceive(somcContext: Context, intent: Intent) {
    val context = somcContext.applicationContext
    //use goAsync just because I'm in BroadcastReceiver
    val pendingAsyncResult = goAsync() 
    mainScope.launch {
        runInterruptible(backgroundWorkDispatcher) {
           // <-- some code here
        }
    }.invokeOnCompletion { throwable ->
        // last operation after done with everything here: 
        pendingAsyncResult.finish()
    }
    //done right away here, and yet the handling will be done one after another, freely
}

Inside the runInterruptible, I can reach the UI thread by calling runBlocking(mainScope.coroutineContext) {} , and I can also cancel the task using cancel() even within.

Using runBlocking is important because I need to wait there for a result. Of course I could use the alternatives when it makes sense, but then I could also use a simple Handler as I don't wait for a result.

I also use backgroundWorkDispatcher to make sure all background operations will be on a single thread, to wait for next operations, one after another.

The question

What are the alternatives to this solution? Something more elegant and/or shorter? Something more official?

Note that I need to handle the operations that are queued by the UI one-after-another, each waiting for the previous one to finish. The BroadcastReceiver is only an example. I'm sure there are (sadly) much harder places to fix in code, but I want to know how to properly handle this first.

CodePudding user response:

I also use backgroundWorkDispatcher to make sure all background operations will be on a single thread, to wait for next operations, one after another.

This is not what a single thread enforces in the coroutines world. A single thread prevents parallelism, but doesn't prevent concurrency. If you launch 2 coroutines in a single-threaded dispatcher, the second may very well start before the end of the first one, assuming the first one has at least one suspension point (call to a suspend function). This interlacing of coroutines execution is what concurrency means. See this other question which stems from the same misconception.

Basically, using the launch or async coroutine builders (or, more generally, starting multiple coroutines) expresses concurrency between those snippets of code. While it could be possible to design a custom dispatcher that enforces one coroutine completes before the next one can start, it's sort of the opposite of what coroutines are expected to do.

Now, that doesn't mean you can't do anything in your case, it just means that launching one coroutine per event is probably not the right thing to do (maybe it is ok, though, see EDIT down below). Instead, I suggest creating a channel to represent the queue of events to process, and launching one single coroutine somewhere to process those events (so it correctly expresses that you don't want concurrency). Instead of launching one coroutine per event in onReceive, you would simply send to the channel (probably using sendBlocking in your case, because onReceive is not suspend).

Now about where to launch that "actor" coroutine, I'd say it depends. You can give the channel and the coroutine the scope that you want (I mean "scope" in terms of variable visibility span, not in coroutine terms here). For instance, if you only want to enforce non-concurrency between events of this specific BroadcastReceiver, I'd say declare the channel as a property of this broadcast receiver, and launch the coroutine upon initialization (e.g. in an init block) in a CoroutineScope that's scoped to the broadcast receiver's lifecycle (if you don't have lifecycleScope there, create the scope yourself and cancel it upon destruction of the BroadcastReceiver).

Inside the onReceive callback, there are sadly calls that should be done on the background thread, and some on the UI thread. Sometimes there are conditions that have them both.

This is not a big problem. Coroutines make it easy to switch between threads by using withContext(someDispatcher). Note that withContext waits for whatever code is inside its lambda before it returns. This means that the code is still sequential even if it switches between threads. Synchronization is handled for you.

Here is some sample code about how it could look:

data class MyEvent(
    ... // add properties containing the data you need for processing
    val pendingResult: PendingResult,
)

// in the broadcast receiver
class MyBroadcastReceiver : BroadcastReceiver() {

    private val eventProcessorScope = MainScope(CoroutineName("my-receiver-event-processor"))

    private val events = Channel<MyEvent>()

    init {
        eventProcessorScope.launch {
            for (e in events) {
                try {
                    process(e)
                } finally {
                    e.pendingResult.finish()
                }
            }
        }
    }

    @UiThread
    override fun onReceive(someContext: Context, intent: Intent) {
        val pendingResult = goAsync()
        // extract the necessary data from the context or intent
        val event = YourEvent(..., pendingResult)
        eventChannel.sendBlocking(event)
    }

    suspend fun process(event: MyEvent) {
        // process the event here

        // do something on UI thread

        withContext(Dispatchers.Default) {
            // do some CPU-bound work
        }

        // do something else on UI thread after CPU work

        withContext(Dispatchers.IO) {
            // do some blocking call
        }
    }

    fun close() {
        eventProcessorScope.cancel()
    }
}

EDIT: From the docs of goAsync, I realized that making the event processing async actually prevents other events from being received, which I'm assuming prevents concurrent calls to onReceive:

Keep in mind that the work you do here will block further broadcasts until it completes

This means you actually can launch as many coroutines as you want to process those events, so long as you complete the goAsync pending result at the end of the processing. But then there is no need for runBlocking/runInterruptible, just using withContext here and there should do fine.

CodePudding user response:

Since you were asking about a thread queue in your comments on the other question, here's how I would do a coroutine job queue. Keep in mind, this is if you need each submitted coroutine to run entirely sequentially (no parallel work at all), which I'm not sure is what you're describing above.

class JobQueue {
    private val scope = MainScope()
    private val queue = Channel<Job>(Channel.UNLIMITED)

    init { 
        scope.launch(Dispatchers.Default) {
            for (job in queue) job.join()
        }
    }

    fun submit(
        context: CoroutineContext = EmptyCoroutineContext,
        block: suspend CoroutineScope.() -> Unit
    ) {
        val job = scope.launch(context, CoroutineStart.LAZY, block)
        queue.trySend(job)
    }

    fun cancel() {
        queueChannel.cancel()
        scope.cancel()
    }
}

You can create an instance of this class in an object or at the top level to make it last the lifetime of your app. It depends on how long you need the jobs to run for. I don't have a lot of BroadcastReceiver experience, but I know they are short-lived, so if they receive something while your app is off-screen, and the coroutine takes longer than a few seconds, I'm not sure exactly what happens. For this kind of work, I think you need to quickly pass it off to a WorkManager. But if you are doing stuff while your app is on-screen, you can use coroutines.

The following would prevent any part of the submitted job to run before any previously submitted job to the same JobQueue instance.

val jobQueue = JobQueue() // at top level so shared by all BroadcastReceivers

//...

override fun onReceive(someContext: Context, intent: Intent) {
    jobQueue.submit {
        val x = getSomething(someContext.applicationContext) // on main thread
        val y = withContext(Dispatchers.IO) {
            doSomeBlockingFetch() // not on main thread so safe to call blocking fun
        }
        doSomethingWithResult() // on main thread
    }
    // onReceive returns promptly on the main thread as required, but the JobQueue
    // prevents subsequent queue-submitted jobs from running before this one
    // is *completely* finished, including the final doSomethingWithResult() call
    // on the main thread.
}

Regarding your code in the question:

Creating a single-threaded dispatcher can prevent the code using that dispatcher from running in parallel, which might be all you want. But it doesn't create a queue, and provides no guarantee of execution order. Suppose my above example were done using your solution. Two calls to onReceive are made by the OS in quick succession. The doSomeBlockingFetch() part would not be run in parallel using your single-threaded dispatcher, but there's no guarantee of which order they would be called in, or which order the subsequent doSomethingWithResult() would be called in.

If you want a less hacky way of preventing your blocking code from running in parallel, and if you don't care about the execution order of the post-IO main thread work, I would use a mutex instead of a single-threaded dispatcher:

val receiverIOMutex = Mutex() // at top level so shared by all BroadcastReceivers

//...

override fun onReceive(someContext: Context, intent: Intent) {
    anyCoroutineScope.launch(Dispatchers.Main.immediate) {
        val x = getSomething(someContext.applicationContext) // on main thread
        val y = receiverIOMutex.withLock {
            withContext(Dispatchers.IO) {
                doSomeBlockingFetch() // not on main thread so safe to call blocking fun
            }
        }
        doSomethingWithResult() // on main thread
    }
}
  • Related