I'm trying to wrap a callbackFlow
within an outer flow
- there are items I'd like to emit from the outer flow, but I've got an old callback interface, which I'd like to adapt to Kotlin flow. I've looked at several examples of usage of callbackFlow
but I can't figure out how to properly trigger it within another flow.
Here's an example:
class Processor {
fun start(processProgress: ProcessProgressListener) {
processProgress.onFinished() //finishes as soon as it starts!
}
}
interface ProcessProgressListener {
fun onFinished()
}
//main method here:
fun startProcess(processor: Processor): Flow<String> {
val mainFlow = flow {
emit("STARTED")
emit("IN_PROGRESS")
}
return merge(processProgressFlow(processor), mainFlow)
}
fun processProgressFlow(processor: Processor) = callbackFlow {
val listener = object : ProcessProgressListener {
override fun onFinished() {
trySend("FINISHED")
}
}
processor.start(listener)
}
The Processor
takes a listener, which is triggered when the process has finished. When that happens, I would like to emit the final item FINISHED
.
The way I invoke the whole flow is as follows:
runBlocking {
startProcess(Processor()).collect {
print(it)
}
}
But, I get no output whatsoever. If I don't use the megre
and only return the mainFlow
, however, I do get the STARTED
and IN_PROGRESS
items though.
What am I doing wrong?
CodePudding user response:
You forgot to call awaitClose
in the end of callbackFlow
block:
fun processProgressFlow(processor: Processor) = callbackFlow<String> {
val listener = object : ProcessProgressListener {
override fun onFinished() {
trySend("FINISHED")
channel.close()
}
}
processor.start(listener)
/*
* Suspends until 'channel.close() or cancel()' is invoked
* or flow collector is cancelled (e.g. by 'take(1)' or because a collector's coroutine was cancelled).
* In both cases, callback will be properly unregistered.
*/
awaitClose { /* unregister listener here */ }
}
awaitClose {}
should be used in the end of callbackFlow
block.
Otherwise, a callback/listener may leak in case of external cancellation.
According to the callbackFlow
docs:
awaitClose
should be used to keep the flow running, otherwise the channel will be closed immediately when block completes.awaitClose
argument is called either when a flow consumer cancels the flow collection or when a callback-based API invokesSendChannel.close
manually and is typically used to cleanup the resources after the completion, e.g. unregister a callback. UsingawaitClose
is mandatory in order to prevent memory leaks when the flow collection is cancelled, otherwise the callback may keep running even when the flow collector is already completed. To avoid such leaks, this method throwsIllegalStateException
if block returns, but the channel is not closed yet.