I wonder how flatMapMerge works in this code:
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // wait 500 ms
emit("$i: Second")
}
fun main() = runBlocking<Unit> {
val startTime = System.currentTimeMillis() // remember the start time
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms
.flatMapMerge { requestFlow(it) }
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
1: First at 136 ms from start
2: First at 231 ms from start
3: First at 333 ms from start
1: Second at 639 ms from start
2: Second at 732 ms from start
3: Second at 833 ms from start
After requestFlow
sleeps for 500 ms, it continues with emit("$i: **Second**")
. Looking at the output, I am confused. My questions are
- Has
flatMapMerge
invokedasFlow
on (1..3) again. Or - Does it cache the flow of 1..3 somewhere for later use because
requestFlow
has told it, ' I'm not done yet'
CodePudding user response:
The entire sequence is like this:
- 1 is emitted from original flow and and passed to
requestFlow
which creates a new flow builder which prints "1: First at..." and then suspends for 500ms. - 2 is emitted from original flow and and passed to
requestFlow
which creates a new flow builder which prints "2: First at..." and then suspends for 500ms. (here we have two suspended functions in memory). - 3 is emitted from original flow and and passed to
requestFlow
which creates a new flow builder which prints "3: First at..." and then suspends for 500ms. (here we have three suspended functions in memory). - After 500ms from 1st emission, the first suspended function resumes and prints "1: Second at ..."
- After 100ms, the second function resumes and prints "2: Second at ..."
- After another 100ms, the last suspended function resumes and prints "3: Second at ..."
flatternMapMerge
just applies the transform you provide and returns a Flow<Int>
. Note that this is not a suspend function and so returns immediately.
Answering your two questions,
- No, the
asFlow
function is not invoked again. - It's not caching the flow, just suspending the functions for 500ms, doing other stuff till then, and resuming from where it left off after the delay is over.