Home > Back-end >  Kotlin coroutines multithread dispatcher and thread-safety for local variables
Kotlin coroutines multithread dispatcher and thread-safety for local variables

Time:09-24

Let's consider this simple code with coroutines

import kotlinx.coroutines.*
import java.util.concurrent.Executors

fun main() {
    runBlocking {
        launch (Executors.newFixedThreadPool(10).asCoroutineDispatcher()) {
            var x = 0
            val threads = mutableSetOf<Thread>()
            for (i in 0 until 100000) {
                x  
                threads.add(Thread.currentThread())
                yield()
            }
            println("Result: $x")
            println("Threads: $threads")
        }
    }
}

As far as I understand this is quite legit coroutines code and it actually produces expected results:

Result: 100000
Threads: [Thread[pool-1-thread-1,5,main], Thread[pool-1-thread-2,5,main], Thread[pool-1-thread-3,5,main], Thread[pool-1-thread-4,5,main], Thread[pool-1-thread-5,5,main], Thread[pool-1-thread-6,5,main], Thread[pool-1-thread-7,5,main], Thread[pool-1-thread-8,5,main], Thread[pool-1-thread-9,5,main], Thread[pool-1-thread-10,5,main]]

The question is what makes these modifications of local variables thread-safe (or is it thread-safe?). I understand that this loop is actually executed sequentially but it can change the running thread on every iteration. The changes done from thread in first iteration still should be visible to the thread that picked up this loop on second iteration. Which code does guarantee this visibility? I tried to decompile this code to Java and dig around coroutines implementation with debugger but did not find a clue.

CodePudding user response:

Your question is completely analogous to the realization that the OS can suspend a thread at any point in its execution and reschedule it to another CPU core. That works not because the code in question is "multicore-safe", but because it is a guarantee of the environment that a single thread behaves according to its program-order semantics.

Kotlin's coroutine execution environment likewise guarantees the safety of your sequential code. You are supposed to program to this guarantee without any worry about how it is maintained.

If you want to descend into the details of "how" out of curiosity, the answer becomes "it depends". Every coroutine dispatcher can choose its own mechanism to achieve it.

As an instructive example, we can focus on the specific dispatcher you use in your posted code: JDK's fixedThreadPoolExecutor. You can submit arbitrary tasks to this executor, and it will execute each one of them on a single (arbitrary) thread, but many tasks submitted together will execute in parallel on different threads.

Furthermore, the executor service provides the guarantee that the code leading up to executor.execute(task) happens-before the code within the task, and the code within the task happens-before another thread's observing its completion (future.get(), future.isCompleted(), getting an event from the associated CompletionService).

Kotlin's coroutine dispatcher drives the coroutine through its lifecycle of suspension and resumption by relying on these primitives from the executor service, and thus you get the "sequential execution" guarantee for the entire coroutine. A single task submitted to the executor ends whenever the coroutine suspends, and the dispatcher submits a new task when the coroutine is ready to resume (when the user code calls continuation.resume(result)).

  • Related