Home > Mobile >  Kotlin Flow: receiving values until emitting timeout
Kotlin Flow: receiving values until emitting timeout

Time:11-24

I want to collect specific amount of values from Flow until value emitting timeout happened. Unfortunately, there are no such operators, so I've tried to implement my own using debounce operator.

The first problem is that producer is too fast and some packages are skipped and not collected at all (they are in onEach of original packages flow, but not in onEach of second flow of merge in withNullOnTimeout).

The second problem - after taking last value according to amount argument orginal flow is closed, but timeout flow still alive and finally produce timeout after last value.

How can I solve this two problems?

My implementations:


suspend fun receive(packages: Flow<ByteArray>, amount: Int): ByteArray {
        val buffer = ByteArrayOutputStream(blockSize.toInt())
        packages
            .take(10)
            .takeUntilTimeout(100) // <-- custom timeout operator
            .collect { pck ->
                buffer.write(pck.data)
            }
    return buffer.toByteArray()
}

fun <T> Flow<T>.takeUntilTimeout(durationMillis: Long): Flow<T> {
    require(durationMillis > 0) { "Duration should be greater than 0" }
    return withNullOnTimeout(durationMillis)
        .takeWhile { it != null }
        .mapNotNull { it }
}

fun <T> Flow<T>.withNullOnTimeout(durationMillis: Long): Flow<T?> {
    require(durationMillis > 0) { "Duration should be greater than 0" }
    return merge(
        this,
        map<T, T?> { null }
            .onStart { emit(null) }
            .debounce(durationMillis)
    )
}

CodePudding user response:

How about:

fun <T> Flow<T>.takeUntilTimeout(timeoutMillis: Long) = channelFlow {
    val collector = launch {
        collect {
            send(it)
        }
        close()
    }
    delay(timeoutMillis)
    collector.cancel()
    close()
}

Using a channelFlow allows you to spawn a second coroutine so you can count the time independently, and quite simply.

CodePudding user response:

This was what initially seemed obvious to me, but as Joffrey points out in the comments, it can cause an unnecessary delay before collection terminates. I'll leave this as an example of a suboptimal, oversimplified solution.

fun <T> Flow<T>.takeUntilTimeout(durationMillis: Long): Flow<T> = flow {
    val endTime = System.currentTimeMillis()   durationMillis
    takeWhile { System.currentTimeMillis() >= endTime }
        .collect { emit(it) }
}

Here's an alternate idea I didn't test.

@Suppress("UNCHECKED_CAST")
fun <T> Flow<T>.takeUntilTimeout(durationMillis: Long): Flow<T> {
    val signal = Any()
    return merge(flow { delay(durationMillis); emit(signal) })
        .takeWhile { it !== signal } as Flow<T>
}
  • Related