Home > Back-end >  Update list with the results of different threads in Kotlin
Update list with the results of different threads in Kotlin

Time:10-04

I want to update a list with the result of different threads.

mainFunction(): List<A> {
  var x: List<A> = listOf<A>()
  val job = ArrayList<Job>()
  val ans = mainScope.async {
            var i = 0
            for (j in (0..5)) {
                job.add(
                    launch {
                        val res = async {
                            func1()
                        }
                        x  = res.await()
                    }
                )
            }
         job.joinAll()
        }
  ans.await()
  return x
}

The list "x" is not getting updated properly. Sometimes, it appends the "res".. sometimes it does not. Is there a thread-safe way to modify lists like this?

CodePudding user response:

If you just want to perform some tasks concurrently and get a list of all the finished results at the end, you could do this:

val jobs = (0..5).map { mainScope.async { func1() } }
val results = jobs.awaitAll()

CodePudding user response:

Short answer

Here is a simpler version of what you're doing, which avoids the synchronization problems you might be running into:

val x = coroutineScope {
    List(6) { async { func1() } }.awaitAll()
}

You can read the long answer if you want to unpack this. I will explain the different things in the original code that are not really idiomatic and could be replaced.

Long answer

There are multiple non-idiomatic things in the code in the question, so I'll try to address each of them.

Indexed for loop for 0-based range

If you just want to repeat an operation several times, it's simpler to just use repeat(6) instead of for (j in 0..5). It's easier to read, especially when you don't need the index variable:

suspend fun mainFunction(): List<A> {
    var x: List<A> = listOf<A>()
    val job = ArrayList<Job>()
    val ans = mainScope.async {
        repeat(6) {
            job.add(
                launch {
                    val res = async {
                        func1()
                    }
                    x  = res.await()
                }
            )
        }
        job.joinAll()
    }
    ans.await()
    return x
}

Creating lists with a loop

If what you want is to create a list out of that loop, you can also use List(size) { computeElement() } instead of repeat (or for), which makes use of the List factory function:

suspend fun mainFunction(): List<A> {
    var x: List<A> = listOf<A>()
    val ans = mainScope.async {
        val jobs = List(6) {
            launch {
                val res = async {
                    func1()
                }
                x  = res.await()
            }
        }
        jobs.joinAll()
    }
    ans.await()
    return x
}

Extra async

There is no need to wrap your launches here with an extra async, you can just use your scope on the launches directly:

suspend fun mainFunction(): List<A> {
    var x: List<A> = listOf<A>()
    val jobs = List(6) {
        mainScope.launch {
            val res = async {
                func1()
            }
            x  = res.await()
        }
    }
    jobs.joinAll()
    return x
}

async immediate await

Using async { someFun() } and then immediately await-ing this Deferred result is equivalent to just calling someFun() directly (unless you're using a different scope or context, which you aren't doing here for the inner most logic).

So you can replace the inner-most part:

val res = async {
    func1()
}
x  = res.await()

By just x = func1(), which gives:

suspend fun mainFunction(): List<A> {
    var x: List<A> = listOf<A>()
    val jobs = List(6) {
        mainScope.launch {
            x  = func1()
        }
    }
    jobs.joinAll()
    return x
}

launch vs async

If you want results, it is usually more practical to use async instead of launch. When you use launch, you have to store the result somewhere manually (which makes you run into synchronization problems like you have now). With async, you get a Deferred<T> value which you can then await(), and when you have a list of Deferred there is no synchronization issues when you await them all.

So the general idea of the previous code is bad practice and might bite you because it requires manual synchronization. You can replace it by:

suspend fun mainFunction(): List<A> {
    val deferredValues = List(6) {
        mainScope.async {
            func1()
        }
    }
    val x = deferredValues.awaitAll()
    return x
}

Or simpler:

suspend fun mainFunction(): List<A> {
    return List(6) {
        mainScope.async {
            func1()
        }
    }.awaitAll()
}

Manual joins vs coroutineScope

It is usually a smell to join() jobs manually. If you want to wait for some coroutines to finish, it is more idiomatic to launch all those coroutines within a coroutineScope { ... } block](), which will suspend until all child coroutines finish.

Here we have already replaced all launch that we join() with async calls that we await, so this doesn't really apply anymore, because we still need to await() the deferred values in order to get the results. However, since we are in a suspend function already, we can still use coroutineScope instead of an external scope like mainScope to ensure that we don't leak any coroutines:

val x = coroutineScope {
    List(6) { async { func1() } }.awaitAll()
}
  • Related