Home > Blockchain >  Collect kotlin flow until condition
Collect kotlin flow until condition

Time:10-22

I need to collect infinite Kotlin Flow, check for its data and stop until I get desired String or once timeout pass. I need to emit expected String when collected desired string or throw an exception in case of timeout. I simulate infinite Kotlin flow in this way:

    fun String.toInfiniteByteArrayFlow(): Flow<ByteArray> {
        return callbackFlow {
            trySend([email protected]())
            awaitClose()
        }
    }

I wrote test which should throw an exception, because stream contains string "ok" and we expect "error", so it should throw an exception after 1 second, because desired string "ok" was not collected. Here it is:

        @Test
    fun `should not collect unexpected data within given duration for infinite flow`() = runTest {
        assertThrows<NotCollectedExpectedMessageException> {
            runCatching {
                ExpectedFlowCommandWithRemovingTrailingCharacters().expectOn("ok".toInfiniteByteArrayFlow(), "error").collect()
                advanceTimeBy(2000L)
            }.onSuccess {
                fail("Should not be here")
            }.onFailure {
                assertThat(it).isInstanceOf(RuntimeException::class.java)
            }
        }
    }

And here is my implementation:

class ExpectedFlowCommandWithRemovingTrailingCharacters {

    suspend fun expectOn(flow: Flow<ByteArray>, expectedCommand: String): Flow<String> {
        val collectedSoFar = StringBuilder()
        return try {
            withTimeout(1000L) {
                val emptyFlow = flow {
                    while (true) {
                        kotlinx.coroutines.delay(200L)
                        emit("")
                    }
                }
                flow {
                    flowOf(flow.map { String(it) }, emptyFlow).flattenConcat().transformWhile {
                        ensureActive()
                        if (it != "") {
                            collectedSoFar.append(it)
                            if (collectedSoFar.toString() == expectedCommand) {
                                emit(collectedSoFar.toString())
                                false
                            } else {
                                true
                            }
                        } else {
                            true
                        }
                    }
                }

            }
        } catch (e: CancellationException) {
            throw RuntimeException(
                "Timeout during collecting data. Collected $collectedSoFar, expected $expectedCommand", e
            )
        }
    }

I'm concatenating two flows: one flow with real data and second flow with "simulated" data only for checking for timeout on each 200ms (so my flow is cancellable). Unfortunately, flow ends immediately instead of throwing exception. What am I doing wrong?

CodePudding user response:

I think I understand now what you're trying to do. A Flow that concatenates byte arrays into a single String until the total string matches expectedCommand, and then emits that single value. But the Flow throws a RuntimeException if it doesn't emit within one second. (This is maybe a little odd, since it's a Flow that is known to only ever return one item. Might make more sense as a Deferred.)

What I can see is that you wrap a cold flow in withTimeout, which doesn't really make sense. A cold Flow will be returned immediately, so there's no opportunity for anything to time out. You haven't called any suspend functions inside the withTimeout block.

Likewise, with the try/catch, you will never catch a CancellationException here, since this function immediately returns the cold flow without ever having called a suspend function that cooperates with cancellation.

So, I think in both cases you missed that wrapping the task of creating a cold flow in withTimeout or try/catch doesn't do anything to what happens when that Flow is eventually collected. You need to implement these using Flow operators and/or builders because these must happen when the returned flow is being collected.

Also, you mention you create another fake flow to concatenate with so your flow is cancellable, but all the terminal Flow operators are cancellable anyway, so you don't need that or ensureActive().

Would this work for your purposes?

suspend fun expectOn(flow: Flow<ByteArray>, expectedCommand: String): Flow<String> = flow {
    val collectedSoFar = StringBuilder()
    try {
        withTimeout(1000L) {
            flow.map { String(it) }
                .filter { it.isNotEmpty() }
                .onEach { collectedSoFar.append(it) }
                .first { collectedSoFar.toString() == expectedCommand }
                .let { emit(it) }
        }
    } catch (e: TimeoutCancellationException) {
        throw RuntimeException(
            "Timeout during collecting data. Collected $collectedSoFar, expected $expectedCommand", e
        )
    }
}
  • Related