I tried to combine several SharedFlow
into one StateFlow
using stateIn
. But my StateFlow
seems not updating after I emit new value to it's SharedFlow
source. I found out that the problem is from how I used stateIn
.
Here is the simplified code I am using (you can run it from kotlin playground).
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.stateIn
import kotlinx.coroutines.launch
fun main() = runBlocking {
val sourceFlow = MutableSharedFlow<Int>()
val stateFlow = sourceFlow.stateIn(GlobalScope, SharingStarted.Lazily, 0)
val job = launch { stateFlow.collect() }
sourceFlow.emit(99)
println(stateFlow.value)
job.cancel()
}
println(stateFlow.value)
prints out 0
when it should prints 99
. I already follow through this documentation about stateIn but still can not find the issue. Someone knows where I did wrong?
CodePudding user response:
The short answer is that no collection has started when 99 is emitted, and thus the value is simply discarded. This is the expected behaviour of unbuffered shared flow described in the docs of SharedFlow
:
A default implementation of a shared flow that is created with MutableSharedFlow() constructor function without parameters has no replay cache nor additional buffer. emit call to such a shared flow suspends until all subscribers receive the emitted value and returns immediately if there are no subscribers.
The reason there are no subscribers on sourceFlow
(the shared flow) is because you called stateIn()
on this flow with SharingStarted.Lazily
:
Sharing is started when the first subscriber appears and never stops
That means the state flow's coroutine only starts collecting the source when the first subscriber starts collecting the state flow. And the reason there is no subscriber on the state flow is because your launch hasn't had time to call collect
before you reach the emit.
When you call launch
, you're starting an asynchronous operation. The code inside the launch
runs concurrently with the code that follows it. Since you're running this launch
inside runBlocking
's scope, it means it runs on the same thread as the rest of main
's code, which means that the body of the launch
cannot start running until the main function's body actually suspends. And here it only suspends when reaching emit()
.