Home > Blockchain >  Does flatMapMerge cache flows?
Does flatMapMerge cache flows?

Time:10-29

I wonder how flatMapMerge works in this code:

fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First") 
    delay(500) // wait 500 ms
    emit("$i: Second")    
}

    fun main() = runBlocking<Unit> { 
        val startTime = System.currentTimeMillis() // remember the start time 
        (1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
            .flatMapMerge { requestFlow(it) }                                                                           
            .collect { value -> // collect and print 
                println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
            } 
    }

    1: First at 136 ms from start
    2: First at 231 ms from start
    3: First at 333 ms from start
    1: Second at 639 ms from start
    2: Second at 732 ms from start
    3: Second at 833 ms from start

After requestFlow sleeps for 500 ms, it continues with emit("$i: **Second**"). Looking at the output, I am confused. My questions are

  1. Has flatMapMerge invoked asFlow on (1..3) again. Or
  2. Does it cache the flow of 1..3 somewhere for later use because requestFlow has told it, ' I'm not done yet'

CodePudding user response:

The entire sequence is like this:

  • 1 is emitted from original flow and and passed to requestFlow which creates a new flow builder which prints "1: First at..." and then suspends for 500ms.
  • 2 is emitted from original flow and and passed to requestFlow which creates a new flow builder which prints "2: First at..." and then suspends for 500ms. (here we have two suspended functions in memory).
  • 3 is emitted from original flow and and passed to requestFlow which creates a new flow builder which prints "3: First at..." and then suspends for 500ms. (here we have three suspended functions in memory).
  • After 500ms from 1st emission, the first suspended function resumes and prints "1: Second at ..."
  • After 100ms, the second function resumes and prints "2: Second at ..."
  • After another 100ms, the last suspended function resumes and prints "3: Second at ..."

flatternMapMerge just applies the transform you provide and returns a Flow<Int>. Note that this is not a suspend function and so returns immediately.

Answering your two questions,

  • No, the asFlow function is not invoked again.
  • It's not caching the flow, just suspending the functions for 500ms, doing other stuff till then, and resuming from where it left off after the delay is over.
  • Related