Home > Enterprise >  How to cancel a combine of flows when one of them emits a certain value?
How to cancel a combine of flows when one of them emits a certain value?

Time:03-29

I am doing multiple network requests in parallel and monitoring the result using a Stateflow. Each network request is done in a separate flow, and I use combine to push the latest status on my Stateflow. Here's my code:

Repo class:

fun networkRequest1(id: Int): Flow<Resource<List<Area>>> =
        flow {
            emit(Resource.Loading())
            try {
                val areas = retrofitInterface.getAreas(id)
                emit(Resource.Success(areas))
            } catch (throwable: Throwable) {
                emit(
                    Resource.Error()
                    )
                )
            }
        }

fun networkRequest2(id: Int): Flow<Resource<List<Area>>> = //same code as above for simplicity
fun networkRequest3(id: Int): Flow<Resource<List<Area>>> = //same code as above for simplicity
fun networkRequest4(id: Int): Flow<Resource<List<Area>>> = //same code as above for simplicity

ViewModel class:

 val getDataCombinedStateFlow: StateFlow<Resource<HashMap<String, Resource<out List<Any>>>>?> =
        getDataTrigger.flatMapLatest {
            withContext(it) { 
                combine(
                    repo.networkRequest1(id: Int),
                    repo.networkRequest2(id: Int),
                    repo.networkRequest3(id: Int),
                    repo.networkRequest4(id: Int)
                ) { a,
                    b,
                    c,
                    d
                    ->
                    hashMapOf(
                        Pair("1", a),
                        Pair("2",b),
                        Pair("3", c),
                        Pair("4", d),
                    )
                }.flatMapLatest {
                    val progress = it
                    var isLoading = false
                    flow<Resource<HashMap<String, Resource<out List<Any>>>>?> {
                        emit(Resource.Loading())
                        progress.forEach { (t, u) ->
                            if (u is Resource.Error) {
                                emit(Resource.Error(error = u.error!!))
                               // I want to cancel here, as I no longer care if 1 request fails
                                return@flow
                            }
                            if (u is Resource.Loading) {
                                isLoading = true
                            }
                        }
                        if (isLoading) {
                            emit(Resource.Loading())
                            return@flow
                        }
                        if (!isLoading) {
                            emit(Resource.Success(progress))
                        }
                    }
                }
            }
        }.stateIn(viewModelScope, SharingStarted.Lazily, null)

View class:

  viewLifecycleOwner.lifecycleScope.launchWhenCreated() {
            viewModel.getDataCombinedStateFlow.collect {
                val result = it ?: return@collect
                binding.loadingErrorState.apply {
                    if (result is Resource.Loading) {
                       //show smth
                    }

                    if (result is Resource.Error) {
                       //show error msg
                    }

                    if (result is Resource.Success) {
                       //done
                    }
                }
            }
        }

I want to be able to cancel all work after a Resource.Error is emitted, as I no longer want to wait or do any related work for the response of other API calls in case one of them fails.

How can I achieve that?

I tried to cancel the collect, but the flows that build the Stateflow keep working and emmit results. I know that they won't be collected but still, I find this a waste of resources.

CodePudding user response:

I think this whole situation is complicated by the fact that you have source flows just to precede what would otherwise be suspend functions with a Loading state. So then you're having to merge them and filter out various loading states, and your end result flow keeps repeatedly emitting a loading state until all the sources are ready.

If you instead have basic suspend functions for your network operations, for example:

suspend fun networkRequest1(id: Int): List<Area> =
        retrofitInterface.getAreas(id)

Then your view model flow becomes simpler. It doesn't make sense to use a specific context just to call a flow builder function, so I left that part out. (I'm also confused as to why you have a flow of CoroutineContexts.)

I also think it's much cleaner if you break out the request call into a separate function.

private fun makeParallelRequests(id: Int): Map<String, Resource<out List<Any>> = coroutineScope {
    val results = listOf(
        async { networkRequest1(id) },
        async { networkRequest2(id) },
        async { networkRequest2(id) },
        async { networkRequest4(id) }
    ).awaitAll()
        .map { Resource.Success(it) }
    listOf("1", "2", "3", "4").zip(results).toMap()
}

val dataCombinedStateFlow: StateFlow<Resource<Map<String, Resource<out List<Any>>>>?> =
    getDataTrigger.flatMapLatest {
        flow {
            emit(Resource.Loading())
            try {
                val result = makeParallelRequests(id)
                emit(Resource.Success(result))
            catch (e: Throwable) {
                emit(Resource.Error(e))
            }
        }
    }

CodePudding user response:

I agree with @Tenfour04 that those nested flows are overly complicated and there are several ways to simplify this (@Tenfour04's solution is a good one).

If you don't want to rewrite everything then you can fix that one line that breaks the structured concurrency:

.stateIn(viewModelScope, SharingStarted.Lazily, null)

With this the whole ViewModel flow is started in the ViewModel's scope while the view starts the collect from a separate scope (viewLifecycleOwner.lifecycleScope which would be the Fragment / Activity scope).

If you want to cancel the flow from the view, you need to use either the same scope or expose a cancel function that would cancel the ViewModel's scope.

If you want to cancel the flow from the ViewModel itself (at the return@flow statement) then you can simply add:

viewModelScope.cancel()

  • Related