Home > Enterprise >  Collect flow but only any new values, not the currently existing value
Collect flow but only any new values, not the currently existing value

Time:07-26

Currently struggling with this one, and so far no combination of SharedFlow and StateFlow have worked.

I have a flow that might have already started with a value, or not.

Using that flow I want to collect any new values that are emitted after I start collecting.

At this moment all my attempts have always failed, no matter what I try it always gets the current value as soon as I start collecting.

An example of what I am trying to achieve:

  • Given a Flow (could be any type, Int is just for simplification) with the following timeline: value 4 is emitted | value 2 is emitted | value 10 is emitted

I want to be able to do the following:

  • If I start collecting after value 4 has already been emitted, I want to only receive anything after that, in this case it would collect 2 and 10 once emitted
  • If I start collecting after value 2 then it would only receive the 10
  • If I start collecting before 4 then it would receive 4, 2 and 10

Tried SharedFlow and Stateflow, tried with replay = 0 and WhileSubscribed, no combination I could find would do what I am looking for.

The only workaround so far that I found was to locally register the time I start my .collect{ } and compare with the start time of the item I receive in the collect. In this case I have the object I am using has a specific origin time, but this workaround will not work for everything like the example above with Integers.

EDIT: Adding implementation example as requested for SharedFlow

This is tied to a Room database call that returns a Flow<MyObject>

MyFragment.kt

lifecycleScope.launch(Dispatchers.IO) {
    viewModel.getMyObjectFlow.shareIn(
        viewModel.viewModelScope, // also tried with fragment lifecyclescope
        SharingStarted.WhileSubscribed(), // also tried with the other 2 options
        replay = 0,
    ).collect{
        ...
    }
}

CodePudding user response:

It doesn't make sense to use shareIn at the use site like that. You're creating a shared Flow that cannot be shared because you don't store the reference for other classes to access and use.

Anyway, the problem is that you are creating the SharedFlow at the use site, so your shared flow only begins collecting from upstream when the fragment calls this code. If the upstream flow is cold, then you will be getting the first value emitted by the cold flow.

The SharedFlow should be created in the ViewModel and put in a property so each Fragment can collect from the same instance. You'll want to use SharingStarted.Eagerly to prevent the cold upstream flow from restarting from the beginning when there are new subscribers after a break.

CodePudding user response:

You have a misconception of how flows work. They always emit only after you start collecting. They emit on-demand. Let's get this example:

val flow1 = flow {
    println("Emitting 1")
    emit(1)
    delay(10.seconds)
    println("Emitting 2")
    emit(2)
}

delay(5.seconds)

println("Start collecting")
flow1.collect {
    println("Collected: $it")
}

The output is:

Start collecting
Emitting 1
Collected: 1

not:

Emitting 1
Start collecting
Collected: 1

This is because flow starts emitting only after you start collecting it. Otherwise, it would have nowhere to emit.

Of course, there are flows which emit from some kind of a cache, queue or a buffer. For example shared flows do this. In that case it looks like you collect after emitting. But this is not really the case. Technically speaking, it works like this:

val buffer = listOf(1 , 2, 3)
val flow1 = flow {
    buffer.forEach {
        println("Emitting $it")
        emit(it)
    }
}

It still emits after you start collecting, but it just emits from the cache. Of course, the item was added to the cache before you started collecting, but this is entirely abstracted from you. You can't know why a flow emitted an item. From the collector perspective it always emitted just now, not in the past. Similarly, you can't know if a webserver read the data from the DB or a cache - this is abstracted from you.

Summing up: it is not possible to collect only new items from just any flow in an universal way. Flows in general don't understand the concept of "new items". They just emit, but you don't know why they do this. Maybe they somehow generate items on-the-fly, maybe they passively observe external events or maybe they re-transmit some items that they collected from another flow. You don't know that.

While developing your solution, you need to understand what was the source of items and develop your code accordingly. For example, if the source is a regular cold flow, then it never starts doing anything before you start collecting. If the source is a state flow, you can just drop the first item. If it is a shared flow or a flow with some replay buffer, then the situation is more complicated.

One possible approach would be to start collecting earlier than we need, initially ignore all collected items and at some point in time start processing them. But this is still far from perfect and it may not work as we expect.

  • Related