Home > Net >  Kotlin Coroutine Flow: When does wasting resource happen when using Flow
Kotlin Coroutine Flow: When does wasting resource happen when using Flow

Time:03-21

I am reading this article to fully understand the dos and donts of using Flow while comparing it to my implementation, but I can't grasp clearly how to tell if you are wasting resource when using Flow or flow builder. When is the time a flow is being release/freed in memory and when is the time that you are wasting resource like accidentally creating multiple instances of flow and not releasing them?

I have a UseCase class that invokes a repository function that returns Flow. In my ViewModel this is how it looks like.

class AssetViewModel constructor(private val getAssetsUseCase: GetAssetsUseCase) : BaseViewModel() {

    private var job: Job? = null

    private val _assetState = defaultMutableSharedFlow<AssetState>()

    fun getAssetState() = _assetState.asSharedFlow()

    init {
        job = viewModelScope.launch {
            while(true) {
                if (lifecycleState == LifeCycleState.ON_START || lifecycleState == LifeCycleState.ON_RESUME)
                    fetchAssets()
                delay(10_000)
            }
        }
    }

    fun fetchAssets() {

        viewModelScope.launch {

            withContext(Dispatchers.IO) {
                getAssetsUseCase(
                    AppConfigs.ASSET_BASE_URL,
                    AppConfigs.ASSET_PARAMS,
                    AppConfigs.ASSET_SIZES[AppConfigs.ASSET_LIMIT_INDEX]
                ).onEach {

                    when(it){

                        is RequestStatus.Loading -> {
                            _assetState.tryEmit(AssetState.FetchLoading)
                        }

                        is RequestStatus.Success -> {
                            _assetState.tryEmit(AssetState.FetchSuccess(it.data.assetDataDomain))
                        }

                        is RequestStatus.Failed -> {
                            _assetState.tryEmit(AssetState.FetchFailed(it.message))
                        }

                    }

                }.collect()
            }

        }

    }

    override fun onCleared() {
        job?.cancel()
        super.onCleared()
    }

}

The idea here is we are fetching data from remote every 10 seconds while also allowing on demand fetch of data via UI.

Just a typical useless UseCase class

class GetAssetsUseCase @Inject constructor(
    private val repository: AssetsRepository // Passing interface not implementation for fake test
) {

    operator fun invoke(baseUrl: String, query: String, limit: String): Flow<RequestStatus<AssetDomain>> {
        return repository.fetchAssets(baseUrl, query, limit)
    }

}

The concrete implementation of repository

class AssetsRepositoryImpl constructor(
    private val service: CryptoService,
    private val mapper: AssetDtoMapper
) : AssetsRepository {

    override fun fetchAssets(
        baseUrl: String,
        query: String,
        limit: String
    ) = flow {

        try {
            emit(RequestStatus.Loading())
            val domainModel = mapper.mapToDomainModel(
                service.getAssetItems(
                    baseUrl,
                    query,
                    limit
                )
            )
            emit(RequestStatus.Success(domainModel))
        } catch (e: HttpException) {
            emit(RequestStatus.Failed(e))
        } catch (e: IOException) {
            emit(RequestStatus.Failed(e))
        }

    }

}

After reading this article which says that using stateIn or sharedIn will improve the performance when using a flow, it seems that I am creating new instances of the same flow on-demand. But there is a limitation as the stated approach only works for variable and not function that returns Flow.

CodePudding user response:

To this I would recommend not using flow as the return type of the usecase function and the api call must not be wrapped inside a flow builder.

Why:

The api call actually is happening once and then again after an interval it is triggered by the view model itself, returning flow from the api caller function will be a bad usage of powerful tool that is actually meant to be called once and then it must be self-reliant, it should emit or pump in the data till the moment it has a subscriber/collector.

One usecase you can consider when using flow as return type from the room db query call, it is called only once and then the room emits data into it till the time it has subscriber.

.....

fun fetchAssets() {
        viewModelScope.launch {
//          loading true
            val result=getusecase(.....)
            when(result){..process result and emit on state..}
//            loading false
        }
}

.....

suspend operator fun invoke(....):RequestStatus<AssetDomain>{
    repository.fetchAssets(baseUrl, query, limit)
}

.....

override fun fetchAssets( baseUrl: String, query: String, limit: String ):RequestStatus {

    try {
        //RequestStatus.Loading()//this can be managed in viewmodel itself
        val domainModel = mapper.mapToDomainModel(
            service.getAssetItems(
                baseUrl,
                query,
                limit
            )
        )
        RequestStatus.Success(domainModel)
    } catch (e: HttpException) {
        RequestStatus.Failed(e)
    } catch (e: IOException) {
        RequestStatus.Failed(e)
    }

}

CodePudding user response:

stateIn and shareIn can save resources if there are multiple observers, by avoiding redundant fetching. And in your case, you could set it up to automatically pause the automatic re-fetching when there are no observers. If, on the UI side you use repeatOnLifecycle, then it will automatically drop your observers when the view is off screen and then you will avoid wasted fetches the user will never see.

I think it’s not often described this way, but often the multiple observers are just observers coming from the same Activity or Fragment class after screen rotations or rapidly switching between fragments. If you use WhileSubscribed with a timeout to account for this, you can avoid having to restart your flow if it’s needed again quickly.

Currently you emit to from an external coroutine instead of using shareIn, so there’s no opportunity to pause execution.

I haven't tried to create something that supports both automatic and manual refetching. Here's a possible strategy, but I haven't tested it.

private val refreshRequest = Channel<Unit>(Channel.CONFLATED)

fun fetchAssets() {
    refreshRequest.trySend(Unit)
}

val assetState = flow {
    while(true) {
        getAssetsUseCase(
            AppConfigs.ASSET_BASE_URL,
            AppConfigs.ASSET_PARAMS,
            AppConfigs.ASSET_SIZES[AppConfigs.ASSET_LIMIT_INDEX]
        ).map {
            when(it){
                is RequestStatus.Loading -> AssetState.FetchLoading
                is RequestStatus.Success -> AssetState.FetchSuccess(it.data.assetDataDomain)
                is RequestStatus.Failed -> AssetState.FetchFailed(it.message)
            }
        }.emitAll()
        withTimeoutOrNull(100L) {
             // drop any immediate or pending manual request
             refreshRequest.receive()
        }
        // Wait until a fetch is manually requested or ten seconds pass:
        withTimeoutOrNull(10000L - 100L) {
             refreshRequest.receive()
        }
    }
}.shareIn(viewModelScope, SharingStarted.WhileSubscribed(4000L), replay = 1)
  • Related