Home > Back-end >  Android Kotlin flow operator - wait until all flows have emitted
Android Kotlin flow operator - wait until all flows have emitted

Time:12-18

I'm currently learning Kotlin flow operators on Android and want to make a network request or a DB operation, followed by parallel requests and wait until all the flows have returned.

    data class Category(var id: Int, var name: String, var parentCategoryId: Int?) {
        var subCategories: List<Category> = listOf()
    }

    data class Catalogue(var categories: List<Category>) {}

    // request to fetch top level categories
    fun getTopCats(): Flow<List<Category>> {
        return flowOf(
            listOf(
                Category(0, "Dairy", null),
                Category(1, "Fruits", null),
                Category(2, "Vegetables", null)
            )
        )
    }

    // request to fetch sub categories
    suspend fun getSubCats(catId: Int): Flow<List<Category>> {
        return when (catId) {
            0 -> flowOf(listOf(Category(3, "Milk", 0), Category(4, "Butter", 0)))
                .onEach { delay(1000) }
            1 -> flowOf(
                listOf(
                    Category(5, "Banana", 1),
                    Category(6, "Mandarin", 1),
                    Category(7, "Orange", 1)
                ).onEach { delay(2000) }
            )
            2 -> flowOf(
                listOf(
                    Category(8, "Carrot", 2),
                    Category(9, "Asparagus", 2),
                    Category(10, "Lettuce", 2)
                ).onEach { delay(3000) }
            )
            else -> flowOf()
        }
    }

First attempt - I think I'm doing something wrong as sub categories are not fetched. Where should I place the combine operator?

    viewModelScope.launch {
        val catalogue = Catalogue(listOf())
        getTopCats().map {
            catalogue.categories = categories // assign top level category

            val flows = arrayListOf<Flow<List<Category>>>()
            categories.onEach { cat ->
                flows.add(getSubCats(cat.id))
            }
            combine(flows) { array ->
                array.onEach { list ->
                    catalogue.categories[list.first().id].subCategories = list
                }
            }
            catalogue
        }.flowOn(Dispatchers.Default).collect() {
            Timber.d("Received catalogue object")
        }
    }

CodePudding user response:

You could use the combine function to combine multiple flows and collect the latest results from all the flows. From the docs for combine:

Returns a Flow whose values are generated with transform function by combining the most recently emitted values by each flow.

For example:

// Some individual flows to return a single value
val getA = flowOf(1)
val getB = flowOf(2)
val getC = flowOf(3)

// Combine into a single flow that emits a list of the individual
// flow latest results
val combined = combine(getA, getB, getC) { a, b, c ->
    // Combine the results into whatever data structure
    // you want - here I made a list
    listOf(a,b,c)
}

MainScope().launch {
    combined.collect { results ->
        println("Got $results") // prints [1, 2, 3]
    }
}

Combine can also take a list of flows of arbitrary length if you have a lot of them, and it returns an array of the values (requires all the flows to return the same type)

val manyFlows = listOf(getA, getB, getC)
val combined = combine(manyFlows) { result ->
    // result is an Array<T> where manyFlows is List<Flow<T>>
    result.toList()
}

Edit

As a more complete example, here is how you might fetch the list of top categories, then combine those to a list of flows that you call all at once with combine

suspend fun getData() {
    val top = getTopCats()
    top.collect { result ->
        // Get the result of the first flow
        val subcatFlows = result.map { getCatCount(it) }

        // Create a new flow to retrieve some count integer
        // from each of the top categories
        val allSubCats = combine(subcatFlows) { counts ->
            // produce a map of categories to counts
            result.zip(counts.toList()).toMap()
        }

        // Call the new combined flow to collect the
        // counts all at once
        allSubCats.collect { results ->
            println("Got $results") // prints {A=1, B=2, C=3}
        }
    }
}

// request to fetch the count for a given
// category by name
private fun getCatCount(name: String): Flow<Int> {
    return when(name) {
        "A" -> flowOf(1)
        "B" -> flowOf(2)
        "C" -> flowOf(3)
        else -> flowOf(-1)
    }
}

// request to fetch top level categories
private fun getTopCats(): Flow<List<String>> {
    return flowOf(listOf("A","B","C"))
}

then you can call getData() inside a coroutine.

  • Related