Home > OS >  Kotlin - Achieving Executors.newSingleThreadExecutor behaviour using coroutines
Kotlin - Achieving Executors.newSingleThreadExecutor behaviour using coroutines

Time:07-21

Executors.newSingleThreadExecutor queues the tasks registered to it and then executes them sequentially. The following code:

val singleThreadedExecutor = Executors.newSingleThreadExecutor()

(0..10).forEach { i ->

    singleThreadedExecutor.execute {

        if (i % 2 == 0) {
            Thread.sleep(2000)
        } else {
            Thread.sleep(1000)
        }

        println(i)
    }
}

outputs this:

I/System.out: 0
I/System.out: 1
I/System.out: 2
I/System.out: 3
I/System.out: 4
I/System.out: 5
I/System.out: 6
I/System.out: 7
I/System.out: 8
I/System.out: 9
I/System.out: 10

I want to achieve this behaviour using Kotlin's Coroutines. I have tried using limitedParallelism but it didn't work as I was expecting. See the code below:

val singleThreadedCoroutine = Dispatchers.Default.limitedParallelism(1)

(0..10).forEach { i ->

    lifecycleScope.launch(singleThreadedCoroutine) {

        if (i % 2 == 0) {
            delay(2000)
        } else {
            delay(1000)
        }

        println(i)
    }
}

But its output was:

I/System.out: 1
I/System.out: 3
I/System.out: 5
I/System.out: 7
I/System.out: 9
I/System.out: 0
I/System.out: 2
I/System.out: 4
I/System.out: 6
I/System.out: 8
I/System.out: 10

Am I missing something? How can I queue tasks in a coroutine, so that it executes them sequentially?

Thanks.

CodePudding user response:

The difference here is not really about the dispatcher, it's about the fact that suspending functions don't block threads, and coroutines are concurrent.

The reason why delay() is called "non-blocking" is because it doesn't block the thread it's called from. This means that the thread is free to go execute other coroutines during that time. On the other hand, using Thread.sleep() really blocks the thread, so it will prevent it from doing anything else during that time and other coroutines (or tasks in your case) will have to wait. If you used Thread.sleep() in your coroutines approach, you should see the same result, but that kinda defeats the purpose of coroutines.

Dispatching coroutines is usually done in order, but I don't believe this is guaranteed documented behaviour, and it probably depends on the dispatcher. However, in any case, when they suspend, they are allowed to interleave (in general) - this is almost the definition of concurrency.

If you don't want concurrency, you have several options:

  1. do all the work on-the-spot in one coroutine: run your loop inside a single launch:
lifecycleScope.launch {
    repeat(11) { i ->

        if (i % 2 == 0) {
            delay(2000)
        } else {
            delay(1000)
        }

        println(i)
    }
}
  1. do all the work in one coroutine, but elsewhere: use a Channel as a queue to send the events to, and instead of using launch on each item, just send the items through the channel. Then, spawn a single coroutine that polls elements from the channel for processing.

  2. if what you really want is to protect only certain parts from running in parallel, but you're ok with concurrency otherwise, you can also use a Mutex. You don't even have to use a single-threaded dispatcher in that case

val mutex = Mutex()

repeat(11) { i ->

    lifecycleScope.launch {

        mutex.withLock {

            if (i % 2 == 0) {
                delay(2000)
            } else {
                delay(1000)
            }

            println(i)
        }
    }
}
  • Related