I'm trying to have a coroutine run while a SharedFlow is active (subscriptionCount
is greater than zero) and be cancelled when the count drops. But somehow even something as simple as distinctUntilChanged()
is not working as it should and I'm baffled.
For this I'm making a "onActive" extension like this:
fun <T : Any> MutableSharedFlow<T>.onActive(
block: suspend CoroutineScope.() -> Unit
): Flow<T> {
val original = this
val isActiveFlow: Flow<Boolean> = subscriptionCount
.map {
println("Class: Count is $it")
it > 0
}
.distinctUntilChanged()
return isActiveFlow.flatMapLatest { isActive ->
println("Class: isActive is $isActive")
// here would be the code that calls `block`
// but just this exactly as is, already triggers the error
original // still emits the original flow,
// that is needed or else subscriptionCount never changes
}
}
This initially this seems to work, but running a test on it that adds several subscribers, will print "isActive is true" several times in a row. Why is distinctUntilChanged()
not working? This repeated call messes-up with the rest of the logic in the redacted area.
The test is like this:
@Test
fun `onActive is called only once with multiple subscribers`() = runBlocking {
val flow = MutableSharedFlow<Int>(
replay = 2,
onBufferOverflow = BufferOverflow.DROP_OLDEST
).apply {
repeat(5) { tryEmit(it) }
}.onActive {
}
val jobs = mutableListOf<Job>()
repeat(3) { count ->
jobs.add(flow.onEach {
println("Test: Listener $count received $it")
}.launchIn(this))
}
delay(100)
jobs.forEach { it.cancel() }
jobs.forEach { it.join() }
}
running this the output is:
Class: Count is 0
Class: isActive is false
Class: Count is 1
Class: Count is 1
Class: isActive is true
Class: Count is 2
Class: Count is 2
Class: isActive is true
Class: Count is 3
Test: Listener 0 received 3
Test: Listener 0 received 4
Test: Listener 1 received 3
Test: Listener 1 received 4
Test: Listener 2 received 3
Test: Listener 2 received 4
Class: Count is 2
Class: isActive is true
Class: Count is 3
Class: Count is 3
Class: Count is 3
Test: Listener 0 received 3
Test: Listener 0 received 4
So the question, why is distinctUntilChanged()
not working and how can I fix it?
CodePudding user response:
It seems the behaviour you're seeing is actually correct as far as distinctUntilChanged
is concerned:
- the first registered subscriber collects the original 2 replayed elements with the starting
isActive=false
value - then
isActive
becomes true because of that first susbcription, so that first subscriber recollects the original flow due toflatMapLatest
, and thus gets again the replayed elements - the other 2 subscribers arrive when the
subscriptionCount
is already non-0 soisActive
stays true for them until they are cancelled
If the coroutine you launch "while there are subscribers" is meant to produce elements in the SharedFlow
, I would rather define the flow like a channelFlow
/callbackFlow
initially, and then use shareIn with SharingStarted.WhileSubscribed
to have this "run when there are susbcribers" behaviour.
If it's "just on the side", you probably want an external scope and just launch a coroutine separately to listen to sharedFlow.subscribersCount
and start/stop the "sidecar" coroutine.