I am doing multiple network requests in parallel and monitoring the result using a Stateflow
.
Each network request is done in a separate flow
, and I use combine
to push the latest status on my Stateflow
. Here's my code:
Repo class:
fun networkRequest1(id: Int): Flow<Resource<List<Area>>> =
flow {
emit(Resource.Loading())
try {
val areas = retrofitInterface.getAreas(id)
emit(Resource.Success(areas))
} catch (throwable: Throwable) {
emit(
Resource.Error()
)
)
}
}
fun networkRequest2(id: Int): Flow<Resource<List<Area>>> = //same code as above for simplicity
fun networkRequest3(id: Int): Flow<Resource<List<Area>>> = //same code as above for simplicity
fun networkRequest4(id: Int): Flow<Resource<List<Area>>> = //same code as above for simplicity
ViewModel class:
val getDataCombinedStateFlow: StateFlow<Resource<HashMap<String, Resource<out List<Any>>>>?> =
getDataTrigger.flatMapLatest {
withContext(it) {
combine(
repo.networkRequest1(id: Int),
repo.networkRequest2(id: Int),
repo.networkRequest3(id: Int),
repo.networkRequest4(id: Int)
) { a,
b,
c,
d
->
hashMapOf(
Pair("1", a),
Pair("2",b),
Pair("3", c),
Pair("4", d),
)
}.flatMapLatest {
val progress = it
var isLoading = false
flow<Resource<HashMap<String, Resource<out List<Any>>>>?> {
emit(Resource.Loading())
progress.forEach { (t, u) ->
if (u is Resource.Error) {
emit(Resource.Error(error = u.error!!))
// I want to cancel here, as I no longer care if 1 request fails
return@flow
}
if (u is Resource.Loading) {
isLoading = true
}
}
if (isLoading) {
emit(Resource.Loading())
return@flow
}
if (!isLoading) {
emit(Resource.Success(progress))
}
}
}
}
}.stateIn(viewModelScope, SharingStarted.Lazily, null)
View class:
viewLifecycleOwner.lifecycleScope.launchWhenCreated() {
viewModel.getDataCombinedStateFlow.collect {
val result = it ?: return@collect
binding.loadingErrorState.apply {
if (result is Resource.Loading) {
//show smth
}
if (result is Resource.Error) {
//show error msg
}
if (result is Resource.Success) {
//done
}
}
}
}
I want to be able to cancel all work after a Resource.Error
is emitted, as I no longer want to wait or do any related work for the response of other API calls in case one of them fails.
How can I achieve that?
I tried to cancel the collect, but the flows that build the Stateflow keep working and emmit results. I know that they won't be collected but still, I find this a waste of resources.
CodePudding user response:
I think this whole situation is complicated by the fact that you have source flows just to precede what would otherwise be suspend functions with a Loading state. So then you're having to merge them and filter out various loading states, and your end result flow keeps repeatedly emitting a loading state until all the sources are ready.
If you instead have basic suspend functions for your network operations, for example:
suspend fun networkRequest1(id: Int): List<Area> =
retrofitInterface.getAreas(id)
Then your view model flow becomes simpler. It doesn't make sense to use a specific context just to call a flow
builder function, so I left that part out. (I'm also confused as to why you have a flow of CoroutineContexts.)
I also think it's much cleaner if you break out the request call into a separate function.
private fun makeParallelRequests(id: Int): Map<String, Resource<out List<Any>> = coroutineScope {
val results = listOf(
async { networkRequest1(id) },
async { networkRequest2(id) },
async { networkRequest2(id) },
async { networkRequest4(id) }
).awaitAll()
.map { Resource.Success(it) }
listOf("1", "2", "3", "4").zip(results).toMap()
}
val dataCombinedStateFlow: StateFlow<Resource<Map<String, Resource<out List<Any>>>>?> =
getDataTrigger.flatMapLatest {
flow {
emit(Resource.Loading())
try {
val result = makeParallelRequests(id)
emit(Resource.Success(result))
catch (e: Throwable) {
emit(Resource.Error(e))
}
}
}
CodePudding user response:
I agree with @Tenfour04 that those nested flows are overly complicated and there are several ways to simplify this (@Tenfour04's solution is a good one).
If you don't want to rewrite everything then you can fix that one line that breaks the structured concurrency:
.stateIn(viewModelScope, SharingStarted.Lazily, null)
With this the whole ViewModel flow is started in the ViewModel's scope while the view starts the collect from a separate scope (viewLifecycleOwner.lifecycleScope
which would be the Fragment / Activity scope).
If you want to cancel the flow from the view, you need to use either the same scope or expose a cancel function that would cancel the ViewModel's scope.
If you want to cancel the flow from the ViewModel itself (at the return@flow
statement) then you can simply add:
viewModelScope.cancel()