I know Flow
is cold, and ShaerdFlow
is hot.
To test cold stream in Kotlin flow, I created below code. My plan is, create a flow that will emit String value and add two collectors on it. I expect that the only one collector will get emitted value, because it is a cold stream.
But the result was different, I found that two values are emitted!
Test code:
val stringFlow: Flow<String> = flow<String> {
for (i in 0..10) {
Timber.d("Integer $i emitted ${currentCoroutineContext()}")
emit("Integer $i")
delay(500)
}
}
// If I uncomment below line, only one flow will be emitted
//.shareIn(coroutineScope, SharingStarted.Lazily)
val job1 = coroutineScope.launch {
stringFlow.collect {
Timber.d("job1 collect 1 $it")
}
}
val job2 = coroutineScope.launch {
stringFlow.collect {
Timber.d("job2 collect 2 $it")
}
}
Result:
stringFlow: Integer 0 produced [StandaloneCoroutine{Active}@c2f624, Dispatchers.IO]
stringFlow: Integer 0 produced [StandaloneCoroutine{Active}@dd9f58d, Dispatchers.IO]
job1: job1 collect 1 Integer 0
job2: job2 collect 2 Integer 0
How can I explain this situation? Can I say "flow will be created once for each collector?"
CodePudding user response:
Yes, it starts emitting a new flow/stream of items whenever we start collecting from it. This is described in the documentation:
The flow being cold means that the block is called every time a terminal operator is applied to the resulting flow.
It is the same for other cold flows as well. For example, flow created with flowOf()
emits the same items whenever collected. Flow created with callbackFlow()
attaches a callback whenever we collect. And so on.
I expect that the only one collector will get emitted value
If you meant to distribute work between multiple consumers, then I believe flows are not a good fit for this. They are more about observing for events or some data, but not for distributing of items between multiple producers and consumers. Use channels instead.