Home > Blockchain >  Kotlin - How To Collect X Values From a Flow?
Kotlin - How To Collect X Values From a Flow?

Time:10-07

Let's say I have a flow that is constantly sending updated like the following:

locationFlow = StateFlow<Location?>(null)

I have a use-case where after a particular event occurs, I want to collect X values from the flow and continue, so something like what I have below. I know that collect is a terminal operator, so I don't think the logic I have below works, but how could I do this in this case? I'd like to collect X items, save them, and then send them to another function for processing/handling.

fun onEventOccurred() {
     launch {
         val locations = mutableListOf<Location?>()
         locationFlow.collect {
               //collect only X locations
               locations.add(it)
         }
         saveLocations(locations)
     }
}

Is there a pre-existing Kotlin function for something like this? I'd like to collect from the flow X times, save the items to a list, and pass that list to another function.

CodePudding user response:

It doesn't matter that collect is terminal. The upstream StateFlow will keep behaving normally because StateFlows don't care what their collectors are doing. you can use the take function to get a specific number of items, and you can use toList() (another terminal function) to concisely copy them into a list once they're all ready.

fun onEventOccurred() {
     launch {
         saveLocations(locationFlow.take(5).toList())
     }
}

CodePudding user response:

If I understood correctly your use case, you want to:

  • discard elements until a specific one is sent – actually, after re-reading your question I don't think this is the case.. I'm leaving it in the example just FYI
  • when that happens, you want to collect X items for further processing

Assuming that's correct, you can use a combination of dropWhile and take, like so:

fun main() = runBlocking {
    val messages = flow {
        repeat(10) {
            println(it)
            delay(500)
            emit(it)
        }
    }

    messages
        .dropWhile { it < 5 }
        .take(3)
        .collect { println(it) } // prints 5, 6, 7
}

You can even have more complex logic, i.e. discard any number that's less than 5, and then take the first 10 even numbers:

fun main() = runBlocking {
    val messages = flow {
        repeat(100) {
            delay(500)
            emit(it)
        }
    }

    messages
        .dropWhile { it < 5 }
        .filter { it % 2 == 0}
        .take(10)
        .collect { println(it) } // prints even numbers, 6 to 24
}
  • Related