I'm currently learning Kotlin flow operators on Android and want to make a network request or a DB operation, followed by parallel requests and wait until all the flows have returned.
data class Category(var id: Int, var name: String, var parentCategoryId: Int?) {
var subCategories: List<Category> = listOf()
}
data class Catalogue(var categories: List<Category>) {}
// request to fetch top level categories
fun getTopCats(): Flow<List<Category>> {
return flowOf(
listOf(
Category(0, "Dairy", null),
Category(1, "Fruits", null),
Category(2, "Vegetables", null)
)
)
}
// request to fetch sub categories
suspend fun getSubCats(catId: Int): Flow<List<Category>> {
return when (catId) {
0 -> flowOf(listOf(Category(3, "Milk", 0), Category(4, "Butter", 0)))
.onEach { delay(1000) }
1 -> flowOf(
listOf(
Category(5, "Banana", 1),
Category(6, "Mandarin", 1),
Category(7, "Orange", 1)
).onEach { delay(2000) }
)
2 -> flowOf(
listOf(
Category(8, "Carrot", 2),
Category(9, "Asparagus", 2),
Category(10, "Lettuce", 2)
).onEach { delay(3000) }
)
else -> flowOf()
}
}
First attempt - I think I'm doing something wrong as sub categories are not fetched. Where should I place the combine operator?
viewModelScope.launch {
val catalogue = Catalogue(listOf())
getTopCats().map {
catalogue.categories = categories // assign top level category
val flows = arrayListOf<Flow<List<Category>>>()
categories.onEach { cat ->
flows.add(getSubCats(cat.id))
}
combine(flows) { array ->
array.onEach { list ->
catalogue.categories[list.first().id].subCategories = list
}
}
catalogue
}.flowOn(Dispatchers.Default).collect() {
Timber.d("Received catalogue object")
}
}
CodePudding user response:
You could use the combine function to combine multiple flows and collect the latest results from all the flows. From the docs for combine:
Returns a Flow whose values are generated with transform function by combining the most recently emitted values by each flow.
For example:
// Some individual flows to return a single value
val getA = flowOf(1)
val getB = flowOf(2)
val getC = flowOf(3)
// Combine into a single flow that emits a list of the individual
// flow latest results
val combined = combine(getA, getB, getC) { a, b, c ->
// Combine the results into whatever data structure
// you want - here I made a list
listOf(a,b,c)
}
MainScope().launch {
combined.collect { results ->
println("Got $results") // prints [1, 2, 3]
}
}
Combine can also take a list of flows of arbitrary length if you have a lot of them, and it returns an array of the values (requires all the flows to return the same type)
val manyFlows = listOf(getA, getB, getC)
val combined = combine(manyFlows) { result ->
// result is an Array<T> where manyFlows is List<Flow<T>>
result.toList()
}
Edit
As a more complete example, here is how you might fetch the list of top categories, then combine those to a list of flows that you call all at once with combine
suspend fun getData() {
val top = getTopCats()
top.collect { result ->
// Get the result of the first flow
val subcatFlows = result.map { getCatCount(it) }
// Create a new flow to retrieve some count integer
// from each of the top categories
val allSubCats = combine(subcatFlows) { counts ->
// produce a map of categories to counts
result.zip(counts.toList()).toMap()
}
// Call the new combined flow to collect the
// counts all at once
allSubCats.collect { results ->
println("Got $results") // prints {A=1, B=2, C=3}
}
}
}
// request to fetch the count for a given
// category by name
private fun getCatCount(name: String): Flow<Int> {
return when(name) {
"A" -> flowOf(1)
"B" -> flowOf(2)
"C" -> flowOf(3)
else -> flowOf(-1)
}
}
// request to fetch top level categories
private fun getTopCats(): Flow<List<String>> {
return flowOf(listOf("A","B","C"))
}
then you can call getData()
inside a coroutine.