Home > Blockchain >  run several coroutines in parallel (with return value)
run several coroutines in parallel (with return value)

Time:04-13

I'm new with kotlin I'm trying to run several requests to a web in parallel threads

so far I got

class HttpClient {
    private val DEFAULT_BASE_URL = "https://someapi"

    fun fetch(endPoint: String, page: Int): String {
        FuelManager.instance.basePath = DEFAULT_BASE_URL
        val (_, response, _) = endPoint.httpGet(listOf("page" to page)).response()
        return String(response.data)
    }

    fun headers(endPoint: String): Headers {
        FuelManager.instance.basePath = DEFAULT_BASE_URL
        val (_, response, _) = endPoint.httpGet(listOf("page" to 1)).response()
        return response.headers
    }
}

and the class that runs the whole process

class Fetcher(private val page: Int) {
    suspend fun run(): String = coroutineScope {
        async {
            HttpClient().fetch(DEFAULT_ENDPOINT, page)
        }
    }.await()


    companion object {

        private const val DEFAULT_ENDPOINT = "endpoint"

        suspend fun fetchAll(): MutableList<String> {

            val totalThreads = (totalCount() / pageSize())   1

            return runBlocking {
                var deck: MutableList<String> = mutableListOf()
                for (i in 1..totalThreads) {
                    deck.add(Fetcher(i).run())
                }

                deck
            }
        }

        private fun pageSize(): Int {
            return HttpClient().headers(DEFAULT_ENDPOINT)["page-size"].first().toInt()
        }

        private fun totalCount(): Int {
            return HttpClient().headers(DEFAULT_ENDPOINT)["total-count"].first().toInt()
        }
    }
}

I'm looking to mirror the Thread.join() from Java. Could you give me some pointers on how to improve my code to achieve that?

Also if not much asking, could you suggest a book/example set on this subject?

Thanks for your help in advance!

CodePudding user response:

A few points:

  1. If you're going to be using coroutines in a project, you'll mostly want to be exposing suspending functions instead of blocking functions. I don't use Fuel, but I see it has a coroutines library with suspend function versions of its blocking functions. Usually, suspend functions that unwrap an asynchronous result have the word "await" in them. I don't know for sure what response() is since I don't use fuel, but if I had to guess, you can use awaitResponse() instead and then make the functions suspend functions.

  2. Not related to coroutines, but there's almost no reason to ever use the String constructor to wrap another String, since Strings are immutable. (The only reason you would ever need to copy a String in memory like that is maybe if you were using it in some kind of weird collection that uses identity comparison instead of `==`` comparison, and you need it to be treated as a different value.)

  3. Also not related to coroutines, but HttpClient in your case should be a singleton object since it holds no state. Then you won't need to instantiate it when you use it or worry about holding a reference to one in a property.

  4. Never use runBlocking in a suspend function. A suspend function must never block. runBlocking creates a blocking function. The only two places runBlocking should ever appear in an application are at the top level main function of a CLI app, or in an app that has both coroutines and some other thread-management library and you need to convert suspend functions into blocking non-suspend functions so they can be used by the non-coroutine-based code.

  5. There's no reason to immediately follow async() with await() if you aren't doing it in parallel with something else. You could just use withContext instead. If you don't need to use a specific dispatcher to call the code, which you don't if it's a suspend function, then you don't even need withContext. You can just call suspend functions directly in your coroutine.

  6. There's no reason to use coroutineScope { } to wrap a single child coroutine. It's for running multiple child coroutines and waiting for all of them.

So, if we change HttpClient's functions into suspend functions, then Fetcher.run becomes very simple.

I also think that it's kind of weird that Fetcher is a class with a single property that is only used in a one-off fashion with its only function. Instead, it would be more straight-forward for Fetcher to be a singleton object and for run to have the parameter it needs. Then you won't need a companion object either since Fetcher as an object can directly host those functions.

Finally, the part you were actually asking about: to run parallel tasks in a coroutine, use coroutineScope { } and then launch async coroutines inside it and await them. The map function is handy for doing this with something you can iterate, and then you can use awaitAll(). You can also get totalCount and pageSize in parallel.

Bringing that all together:

object HttpClient {
    private val DEFAULT_BASE_URL = "https://someapi"

    suspend fun fetch(endPoint: String, page: Int): String {
        FuelManager.instance.basePath = DEFAULT_BASE_URL
        val (_, response, _) = endPoint.httpGet(listOf("page" to page)).awaitResponse()
        return response.data
    }

    suspend fun headers(endPoint: String): Headers {
        FuelManager.instance.basePath = DEFAULT_BASE_URL
        val (_, response, _) = endPoint.httpGet(listOf("page" to 1)).awaitResponse()
        return response.headers
    }
}
object Fetcher() {
    suspend fun run(page: Int): String = 
        HttpClient.fetch(DEFAULT_ENDPOINT, page)

    private const val DEFAULT_ENDPOINT = "endpoint"

    suspend fun fetchAll(): List<String> {
        val totalThreads = coroutineScope {
            val totalCount = async { totalCount() }
            val pageSize = async { pageSize() }
            (totalCount.await() / pageSize.await())   1
        }
        return coroutineScope {
            (1..totalThreads).map { i ->
                async { run(i) }
            }.awaitAll()
        }
    }

    private suspend fun pageSize(): Int {
        return HttpClient.headers(DEFAULT_ENDPOINT)["page-size"].first().toInt()
    }

    private suspend fun totalCount(): Int {
        return HttpClient.headers(DEFAULT_ENDPOINT)["total-count"].first().toInt()
    }

}

I changed MutableList to List, since it's simpler, and usually you don't need a MutableList. If you really need one you can call toMutableList() on it.

  • Related