Home > Software design >  How can you convert a kotlin flow to a mutable flow?
How can you convert a kotlin flow to a mutable flow?

Time:11-11

I am trying to keep a mutable state flow in my class, but when I apply any methods to it, it will be converted to an immutable Flow<T>:

class MyClass : Listener<String> {

       private val source = Source()

       val flow: Flow<String?>
          get() = _flow

       // region listener
        override fun onUpdate(value: String?) {
            if (value!= null) {
                // emit object changes to the flow
               // not possible, because the builder operators on the flow below convert it to a `Flow` and it doesn't stay as a `MutableSharedFlow` :(
                _flow.tryEmit(value) 
            }
        }
        // end-region

        @OptIn(ExperimentalCoroutinesApi::class)
        private val _flow by lazy {
            MutableStateFlow<String?>(null).onStart {
                emitAll(
                    flow<String?> {
                        val initialValue = source.getInitialValue()
                        emit(initialValue)
                    }.flowOn(MyDispatchers.background)
                )
            }.onCompletion { error ->
                // when the flow is cancelled, stop listening to changes
                if (error is CancellationException) {
                    // is was cancelled
                    source.removeListener(this@MyClass)
                }
            }.apply {
                // listen to changes and send them to the flow
                source.addListener(this@MyClass)
            }
        }
}

Is there a way to keep the flow as a MutableStateFlow even after I apply the onCompletion/onStart methods to it?

CodePudding user response:

If you apply transformations to a mutable state flow, the resulting flow becomes read-only because the original flow acts as its source. If you want to manually emit events, you need to emit them to the initial source flow.

That being said, it seems what you want to achieve here is quite simple: bridging a callback based API to a Flow API. There is a built-in function in Kotlin coroutines to do that, which is called callbackFlow.

I'm not sure how your source API handles backpressure, but it would look something like this:

@OptIn(ExperimentalCoroutinesApi::class)
fun Source.asFlow(): Flow<String?> = callbackFlow {
    send(getInitialValue())

    val listener = object : Listener<String> {
        override fun onUpdate(value: String?) {
            if (value != null) {
                trySend(value)
            }
        }
    }
    addListener(listener)
    awaitClose {
        removeListener(listener)
    }
}

Or maybe with runBlocking { send(value) } instead of trySend(), depending on how Source handles backpressure and blocking in its own thread pool.

Note that flowOn might be used on top of this flow, but it would only really matter for getInitialValue(), because the thread that executes the callback is controlled by the Source anyway.

If adding many listeners is expensive for the Source, you might also consider sharing this flow using the shareIn() operator, so multiple subsccribers share the same listener subscription.

  • Related