Home > Enterprise >  How to use callbackFlow within a flow?
How to use callbackFlow within a flow?

Time:12-18

I'm trying to wrap a callbackFlow within an outer flow - there are items I'd like to emit from the outer flow, but I've got an old callback interface, which I'd like to adapt to Kotlin flow. I've looked at several examples of usage of callbackFlow but I can't figure out how to properly trigger it within another flow.

Here's an example:

class Processor {
    fun start(processProgress: ProcessProgressListener) {
        processProgress.onFinished() //finishes as soon as it starts!
    }
}

interface ProcessProgressListener {
    fun onFinished()
}

//main method here:
 fun startProcess(processor: Processor): Flow<String> {
     val mainFlow = flow {
         emit("STARTED")
         emit("IN_PROGRESS")
     }

     return merge(processProgressFlow(processor), mainFlow)
 }

 fun processProgressFlow(processor: Processor) = callbackFlow {
     val listener = object : ProcessProgressListener {
         override fun onFinished() {
             trySend("FINISHED")
         }
     }

     processor.start(listener)
 }

The Processor takes a listener, which is triggered when the process has finished. When that happens, I would like to emit the final item FINISHED.

The way I invoke the whole flow is as follows:

     runBlocking {
         startProcess(Processor()).collect {
             print(it)
         }
     }

But, I get no output whatsoever. If I don't use the megre and only return the mainFlow, however, I do get the STARTED and IN_PROGRESS items though.

What am I doing wrong?

CodePudding user response:

You forgot to call awaitClose in the end of callbackFlow block:

fun processProgressFlow(processor: Processor) = callbackFlow<String> {
    val listener = object : ProcessProgressListener {
        override fun onFinished() {
            trySend("FINISHED")
            channel.close()
        }
    }

    processor.start(listener)

    /*
     * Suspends until 'channel.close() or cancel()' is invoked
     * or flow collector is cancelled (e.g. by 'take(1)' or because a collector's coroutine was cancelled).
     * In both cases, callback will be properly unregistered.
     */
    awaitClose { /* unregister listener here */ }
}

awaitClose {} should be used in the end of callbackFlow block. Otherwise, a callback/listener may leak in case of external cancellation.

According to the callbackFlow docs:

awaitClose should be used to keep the flow running, otherwise the channel will be closed immediately when block completes. awaitClose argument is called either when a flow consumer cancels the flow collection or when a callback-based API invokes SendChannel.close manually and is typically used to cleanup the resources after the completion, e.g. unregister a callback. Using awaitClose is mandatory in order to prevent memory leaks when the flow collection is cancelled, otherwise the callback may keep running even when the flow collector is already completed. To avoid such leaks, this method throws IllegalStateException if block returns, but the channel is not closed yet.

  • Related