With the following code sample:
val scope = CoroutineScope(Dispatchers.IO)
val flow = channelFlow {
println("1")
send("1")
println("2")
send("2")
}.buffer(0)
scope.launch {
flow.collect {
println("collect $it")
delay(5000)
}
}
The following output:
1
2 // should be printed after collect 1
collect 1
collect 2 // after 5000ms
Expected: the print 1, then print collect 1, wait 5 seconds, then print 2
it seem that the send function does not suspend, with a buffer set to 0 or RENDEZVOUS, using a standard flow with emit suspend work as expected, is there another operator, or does the channel flow can suspend(have a buffer with 0/1 capacity) ?
CodePudding user response:
As the documentation of Flow.buffer() states, this function creates two coroutines: one producer and one consumer. These coroutines work concurrently. That means that at the time collect()
block is launched to process an item, send()
on the other side is already resumed. I believe there is a race condition between 2
and collect 1
, but in practice the order may be deterministic.
"Normal" flow with emit()
works differently. It works sequentially, so the producer and the consumer don't run at the same time. emit()
suspends until the consumer finishes working on the previous item and requests another one.