Home > Net >  single() and first() terminal operators when producer emits many values
single() and first() terminal operators when producer emits many values

Time:08-12

I need to collect only the first value from two emitted by flow.

I have a function that returns flow:

fun myFlow = flow {
    try {
        emit(localDataSource.fetchData())
    } catch(e: Exception) { 
        // just skip this error
    }
    emit(remoteDataSource.fetchData(1000, 0))
}

In one special case I need only first emitted value, doesn't matter is it from local cache or remote source. I tried this one:

fun getRandomFavoriteItem() = myFlow.first().filter { it.score > 7 }.randomOrNull()

But first() invocation always throws

java.lang.IllegalStateException: Flow exception transparency is violated: Previous 'emit' call has thrown exception kotlinx.coroutines.flow.internal.AbortFlowException: Flow was aborted, no more elements needed, but then emission attempt of value.

What I've tried:

  1. single() -

java.lang.IllegalArgumentException: Flow has more than one element

  1. take(1).first() -

java.lang.IllegalStateException: Flow exception transparency is violated: Previous 'emit' call has thrown exception kotlinx.coroutines.flow.internal.AbortFlowException: Flow was aborted, no more elements needed, but then emission attempt of value

  1. Catch error but it doesn't stop here:
myFlow.catch { e ->
    if (e !is IllegalArgumentException) {
        throw e
    }
}.first().filter { it.score > 7 }.randomOrNull()

My questions are:

  1. What is the point of usage first() if it doesn't work in case of more than 1 emitted values? If I would know that my flow produces only one value I could just use any other terminal operator.
  2. How to avoid those errors and how to collect only first value without adding repeated code?

CodePudding user response:

This isn't an error in first(). It's an error in your flow. You are not permitted to swallow all exceptions in a Flow in the way you have.

Some varying approaches may differ in whether they detect that error, but what you must fix is how you "just skip" all exceptions. Consider catching only the specific exceptions you're concerned about, or at least making sure to catch and rethrow CancellationException or its subclasses.

CodePudding user response:

Lous Wasserman already found the problem, here some more details.

As mentioned in the error message you're also catching the AbortFlowException.

java.lang.IllegalStateException: Flow exception transparency is violated: Previous 'emit' call has thrown exception kotlinx.coroutines.flow.internal.AbortFlowException: Flow was aborted, no more elements needed, but then emission attempt of value.

You're bascically catching an exception which interferes with the way flows work. The problem is not about the first function.

Since AbortFlowException is internal you cannot access it, but you can access its superclass CancellationException. You need to modify your catch block like this:

try {
    emit(localDataSource.fetchData())
} catch (e: Exception) {
    if(e is CancellationException) {
        throw e
    }
}

Now first will work in the way you expect it to.


Edit:

A better solution would be to handle the exception within fetchData (you might return null in case one was thrown). This way you don't get in the way of the flow mechanics.

If that is not possible, you could create a wrapper function which takes care of the exception handling.

  • Related