Home > database >  Analogue ignoreElements() in coroutines
Analogue ignoreElements() in coroutines

Time:08-26

I am working on the task of rewriting classes from RxJava to Coroutines and ran into the problem of finding an analogue of the ignoreElements() operator. What can replace it in case of using Coroutines?

For example, in this case. I replace Completable with Flow<Unit>:

    private fun observeHistory(): Completable =
        sensorDataService.observeRangeData()
            .doOnNext {
                historyRepo.add(
                    HistoryData(...)
                )
            }
            .ignoreElements()

And got this

    private fun observeHistory(): Flow<Unit> = flow {
        sensorDataService.observeRangeData()
            .onEach {
                historyRepo.add(
                    HistoryData(...)
                )
            }
    }
    ```

CodePudding user response:

Technically one equivalent of observable.ignoreElements() would be flow.filter { false }: it returns a flow with 0 elements (skipping all elements of the source flow), and completes when the source flow completes. That said, the resulting Flow<T> would not be an ideal representation of what you're actually returning.

A better return type to express this would be Deferred<Unit>, which better matches Rx's Completable, for instance with someScope.async(LAZY) { flow.collect() } - but that would be even less idiomatic.

A more idiomatic approach to solve the problem in your example is to use a suspend function instead, because this is what best represents a single async value (but in a sequential way thanks to coroutines):

private suspend fun observeHistory() {
    sensorDataService.observeRangeData().collect {
        historyRepo.add(HistoryData(...))
    }
}

Then if you want to launch this as a background process from the calling code, you can use launch { observeHistory() } in a proper coroutine scope, which you would need anyway if you had returned a flow (because collecting the flow is suspending too).

CodePudding user response:

It depends on what RxJava code you are migrating. The RxJava ignoreElements() operator return Completable and suppresses all of the items emitted, in other words, it emits nothing.

So, if you have something like this in RxJava:

fun saveData(): Completable {
    return yourObservable
         .doOnComplete {
            // save data
         }
         .ignoreElements()
}

then in Kotlin Coroutines you can use just suspend method without a return value:

suspend fun saveData(data: String) {
    // save data
}

but if you want to keep a reactive approach you can convert it to Kotlin Flow but with Flow, you don't have an alternative to emit nothing, feel free to return your source type (you can return Unit type but it looks pointless then maybe you should reconsider why you use Flow here):

 fun saveData(data: String): Flow<String> {
       return yourFlow
           .onCompletion {
               // save data
           }
    }

and then you can just ignore the value during collecting:

saveData(data)
  .collect()
  • Related