Home > Blockchain >  Flow<T>.distinctUntilChanged() is not working
Flow<T>.distinctUntilChanged() is not working

Time:10-13

I'm trying to have a coroutine run while a SharedFlow is active (subscriptionCount is greater than zero) and be cancelled when the count drops. But somehow even something as simple as distinctUntilChanged() is not working as it should and I'm baffled.

For this I'm making a "onActive" extension like this:

fun <T : Any> MutableSharedFlow<T>.onActive(
    block: suspend CoroutineScope.() -> Unit
): Flow<T> {

    val original = this

    val isActiveFlow: Flow<Boolean> = subscriptionCount
        .map {
            println("Class: Count is $it")
            it > 0
        }
        .distinctUntilChanged()

    return isActiveFlow.flatMapLatest { isActive ->
        println("Class: isActive is $isActive")
        // here would be the code that calls `block`
        // but just this exactly as is, already triggers the error

        original // still emits the original flow, 
                 // that is needed or else subscriptionCount never changes
    }
}

This initially this seems to work, but running a test on it that adds several subscribers, will print "isActive is true" several times in a row. Why is distinctUntilChanged() not working? This repeated call messes-up with the rest of the logic in the redacted area.

The test is like this:

    @Test
    fun `onActive is called only once with multiple subscribers`() = runBlocking {

        val flow = MutableSharedFlow<Int>(
            replay = 2,
            onBufferOverflow = BufferOverflow.DROP_OLDEST
        ).apply {
            repeat(5) { tryEmit(it) }
        }.onActive {

        }

        val jobs = mutableListOf<Job>()
        repeat(3) { count ->
            jobs.add(flow.onEach {
                println("Test:  Listener $count received $it")
            }.launchIn(this))
        }
        delay(100)
        jobs.forEach { it.cancel() }
        jobs.forEach { it.join() }
    }

running this the output is:

Class: Count is 0
Class: isActive is false
Class: Count is 1
Class: Count is 1
Class: isActive is true
Class: Count is 2
Class: Count is 2
Class: isActive is true
Class: Count is 3
Test:  Listener 0 received 3
Test:  Listener 0 received 4
Test:  Listener 1 received 3
Test:  Listener 1 received 4
Test:  Listener 2 received 3
Test:  Listener 2 received 4
Class: Count is 2
Class: isActive is true
Class: Count is 3
Class: Count is 3
Class: Count is 3
Test:  Listener 0 received 3
Test:  Listener 0 received 4

So the question, why is distinctUntilChanged() not working and how can I fix it?

CodePudding user response:

It seems the behaviour you're seeing is actually correct as far as distinctUntilChanged is concerned:

  • the first registered subscriber collects the original 2 replayed elements with the starting isActive=false value
  • then isActive becomes true because of that first susbcription, so that first subscriber recollects the original flow due to flatMapLatest, and thus gets again the replayed elements
  • the other 2 subscribers arrive when the subscriptionCount is already non-0 so isActive stays true for them until they are cancelled

If the coroutine you launch "while there are subscribers" is meant to produce elements in the SharedFlow, I would rather define the flow like a channelFlow/callbackFlow initially, and then use shareIn with SharingStarted.WhileSubscribed to have this "run when there are susbcribers" behaviour.

If it's "just on the side", you probably want an external scope and just launch a coroutine separately to listen to sharedFlow.subscribersCount and start/stop the "sidecar" coroutine.

  • Related