Is there a way to limit the number of collector in a function that returns a Flow using flow builder?
I have this public method in a ViewModel
fun fetchAssets(limit: String) {
viewModelScope.launch {
withContext(Dispatchers.IO){
getAssetsUseCase(AppConfigs.ASSET_PARAMS, limit).onEach {
when (it) {
is RequestStatus.Loading -> {
_assetState.tryEmit(AssetState.FetchLoading)
}
is RequestStatus.Success -> {
_assetState.tryEmit(AssetState.FetchSuccess(it.data.assetDataDomain))
}
is RequestStatus.Failed -> {
_assetState.tryEmit(AssetState.FetchFailed(it.message))
}
}
}.collect()
}
}
}
This method is called on ViewModel's init
block, but can also be called manually on UI.
This flow emits value every 10 seconds.
Repository
override fun fetchAssets(
query: String,
limit: String
) = flow {
while (true) {
try {
interceptor.baseUrl = AppConfigs.ASSET_BASE_URL
emit(RequestStatus.Loading())
val domainModel = mapper.mapToDomainModel(service.getAssetItems(query, limit))
emit(RequestStatus.Success(domainModel))
} catch (e: HttpException) {
emit(RequestStatus.Failed(e))
} catch (e: IOException) {
emit(RequestStatus.Failed(e))
}
delay(10_000)
}
}
Unfortunately every time fetch()
was invoke from UI, I noticed that it creates another collectors thus can ended up having tons of collector which is really bad and incorrect.
The idea is having a flow that emits value every 10 seconds but can also be invoke manually via UI for immediate data update without having multiple collectors.
CodePudding user response:
You seem to misunderstand what does it mean to collect the flow or you misuse the collect operation. By collecting the flow we mean we observe it for changes. But you try to use collect()
to introduce changes to the flow, which can't really work. It just starts another flow in the background.
You should collect the flow only once, so keep it inside init
or wherever it is appropriate for your case. Then you need to update the logic of the flow to make it possible to trigger reloading on demand. There are many ways to do it and the solution will differ depending whether you need to reset the timer on manual update or not. For example, we can use the channel to notify the flow about the need to reload:
val reloadChannel = Channel<Unit>(Channel.CONFLATED)
fun fetchAssets(
query: String,
limit: String
) = flow {
while (true) {
try {
...
}
withTimeoutOrNull(10.seconds) { reloadChannel.receive() } // replace `delay()` with this
}
}
fun reload() {
reloadChannel.trySend(Unit)
}
Whenever you need to trigger the manual reload, do not start another flow or invoke another collect()
operation, but instead just invoke reload()
. Then the flow that is already being collected, will start reloading and will emit state changes.
This solution resets the timer on manual reload, which I believe is better for the user experience.
CodePudding user response:
If _assetState
is of type StateFlow
you can use it's value
property to update UI with the latest state:
val lastAssetState = viewModel.assetState.value
// use lastAssetState to update UI
Another approach is to use first
or firstOrNull
terminal operators to return the first element emitted by the flow matching the given predicate and then cancel flow's collection:
viewModelScope.launch {
val result = getAssetsUseCase(AppConfigs.ASSET_PARAMS, limit).firstOrNull {
it is RequestStatus.Success || it is RequestStatus.RequestStatus.Failed
}
when (result) {
is RequestStatus.Success -> {
_assetState.tryEmit(AssetState.FetchSuccess(it.data.assetDataDomain))
}
is RequestStatus.Failed -> {
_assetState.tryEmit(AssetState.FetchFailed(it.message))
}
}
}