The intention of the code is to setup two flows that will return complete when the message they are waiting for appears on the incoming flow.
The output for regularFlow
is
1 REC message2
2 REC message2
2 Done
1 REC message1
1 Done
This is what I would expect, however for receiveChannel the output is
1 REC message2
2 REC message1
Expected at least one element matching the predicate Function2<java.lang.String, kotlin.coroutines.Continuation<? super java.lang.Boolean>, java.lang.Object>
Can anyone help explain why this difference?
@Test
fun receiveChannel() {
runBlocking {
doIt {
produce<String> {
delay(500)
send("message2")
delay(1000)
send("message1") }.receiveAsFlow()
}
}
}
@Test
fun regularFlow() {
runBlocking {
doIt {
flow {
delay(500)
emit("message2")
delay(1000)
emit("message1")
}
}
}
}
suspend fun doIt(buildFlow: suspend () -> Flow<String>) {
coroutineScope {
val commands = buildFlow()
val filter1 = async {
commands.onEach {
println("1 REC " it)
}.first {
it == "message1"
}
println("1 Done")
}
val filter2 = async {
commands.onEach {
println("2 REC " it)
}.first {
it == "message2"
}
println("2 Done")
}
filter1.await()
filter2.await()
}
}
CodePudding user response:
Each item from a channel can be received only once, ever, no matter how many receivers are subscribed to it. Only one of those receivers will see any one of the emitted items. produceAsFlow()
creates a hot flow that preserves this Channel behavior among multiple collectors. Whichever collector happens to get each item in the flow is the only one that ever receives that item.
So, in this case, your first collector is the one getting message2, so the second collector never sees message2. Vice versa for message1. This may not even be a guaranteed outcome. I think it’s a race situation as far as which coroutine’s collector gets each next item from the flow.