I want to collect specific amount of values from Flow until value emitting timeout happened. Unfortunately, there are no such operators, so I've tried to implement my own using debounce
operator.
The first problem is that producer is too fast and some packages are skipped and not collected at all (they are in onEach
of original packages
flow, but not in onEach
of second flow of merge
in withNullOnTimeout
).
The second problem - after taking last value according to amount
argument orginal flow is closed, but timeout flow still alive and finally produce timeout after last value.
How can I solve this two problems?
My implementations:
suspend fun receive(packages: Flow<ByteArray>, amount: Int): ByteArray {
val buffer = ByteArrayOutputStream(blockSize.toInt())
packages
.take(10)
.takeUntilTimeout(100) // <-- custom timeout operator
.collect { pck ->
buffer.write(pck.data)
}
return buffer.toByteArray()
}
fun <T> Flow<T>.takeUntilTimeout(durationMillis: Long): Flow<T> {
require(durationMillis > 0) { "Duration should be greater than 0" }
return withNullOnTimeout(durationMillis)
.takeWhile { it != null }
.mapNotNull { it }
}
fun <T> Flow<T>.withNullOnTimeout(durationMillis: Long): Flow<T?> {
require(durationMillis > 0) { "Duration should be greater than 0" }
return merge(
this,
map<T, T?> { null }
.onStart { emit(null) }
.debounce(durationMillis)
)
}
CodePudding user response:
How about:
fun <T> Flow<T>.takeUntilTimeout(timeoutMillis: Long) = channelFlow {
val collector = launch {
collect {
send(it)
}
close()
}
delay(timeoutMillis)
collector.cancel()
close()
}
Using a channelFlow
allows you to spawn a second coroutine so you can count the time independently, and quite simply.
CodePudding user response:
This was what initially seemed obvious to me, but as Joffrey points out in the comments, it can cause an unnecessary delay before collection terminates. I'll leave this as an example of a suboptimal, oversimplified solution.
fun <T> Flow<T>.takeUntilTimeout(durationMillis: Long): Flow<T> = flow {
val endTime = System.currentTimeMillis() durationMillis
takeWhile { System.currentTimeMillis() >= endTime }
.collect { emit(it) }
}
Here's an alternate idea I didn't test.
@Suppress("UNCHECKED_CAST")
fun <T> Flow<T>.takeUntilTimeout(durationMillis: Long): Flow<T> {
val signal = Any()
return merge(flow { delay(durationMillis); emit(signal) })
.takeWhile { it !== signal } as Flow<T>
}