Let me use a simple image to illustrate what I want to get:
I don't want to use SharedFlow
's replayCache
to achieve this because if a new observer observes that SharedFlow
, it will get 2 emissions instead of one latest emission.
Or if I write it in code:
val sharedFlow = MutableSharedFlow(replay = 1)
val theFlowThatIWant = sharedFlow.unknownOperator { … }
sharedFlow.emit(1)
sharedFlow.emit(2)
sharedFlow.collect {
println(it)
}
Expected output:
2
theFlowThatIWant.collect {
println(it)
}
Expected output:
1
CodePudding user response:
We can create such operator by ourselves. We can generalize it to more items than only the last one and use circular buffer to keep postponed items:
suspend fun main() {
val f = flow {
repeat(5) {
println("Emitting $it")
emit(it)
delay(1000)
}
}
f.postponeLast()
.collect { println("Collecting $it") }
}
fun <T> Flow<T>.postponeLast(count: Int = 1): Flow<T> = flow {
val buffer = ArrayDeque<T>(count)
collect {
if (buffer.size == count) {
emit(buffer.removeFirst())
}
buffer.addLast(it)
}
}
Note that this solution never emits postponed items. If you like to emit them at the end, just add this after collect { }
:
while (buffer.isNotEmpty()) {
emit(buffer.removeFirst())
}