Home > Blockchain >  How can I get a previous emission Kotlin Flow?
How can I get a previous emission Kotlin Flow?

Time:03-25

Let me use a simple image to illustrate what I want to get:

enter image description here

I don't want to use SharedFlow's replayCache to achieve this because if a new observer observes that SharedFlow, it will get 2 emissions instead of one latest emission.

Or if I write it in code:

val sharedFlow = MutableSharedFlow(replay = 1)
val theFlowThatIWant = sharedFlow.unknownOperator { … }
sharedFlow.emit(1)
sharedFlow.emit(2)

sharedFlow.collect {
   println(it)
}

Expected output:
2

theFlowThatIWant.collect {
   println(it)
}

Expected output:
1

CodePudding user response:

We can create such operator by ourselves. We can generalize it to more items than only the last one and use circular buffer to keep postponed items:

suspend fun main() {
    val f = flow {
        repeat(5) {
            println("Emitting $it")
            emit(it)
            delay(1000)
        }
    }

    f.postponeLast()
        .collect { println("Collecting $it") }
}

fun <T> Flow<T>.postponeLast(count: Int = 1): Flow<T> = flow {
    val buffer = ArrayDeque<T>(count)
    collect {
        if (buffer.size == count) {
            emit(buffer.removeFirst())
        }
        buffer.addLast(it)
    }
}

Note that this solution never emits postponed items. If you like to emit them at the end, just add this after collect { }:

while (buffer.isNotEmpty()) {
    emit(buffer.removeFirst())
}
  • Related