Home > Net >  Why is my channel closed whenever I send the data
Why is my channel closed whenever I send the data

Time:10-15

I have my code below

interface Listener {
    fun onGetData(data: Int)
    fun onClose()
}

class MyEmitter {
    var listener: Listener? = null
    fun sendData(data: Int) = listener?.onGetData(data)
    fun close() = listener?.onClose()
}

fun handleInput(myEmitter: MyEmitter) = channelFlow {
    myEmitter.listener = object:Listener {
        override fun onGetData(data: Int) { trySend(data) }
        override fun onClose() { close() }
    }
}

fun main(): Unit = runBlocking {
    val myEmitter = MyEmitter()
    
    handleInput(myEmitter).collect {
        println(it)
    }

    myEmitter.sendData(1)
    myEmitter.sendData(2)
    myEmitter.close()
}

Whenever I send the data e.g. myEmitter.sendData(1), it does get into trySend(data), but the result is closed.

Why is it closed? How can I keep it open?

CodePudding user response:

I think it's not documented terribly clearly, but just like the flow builder, the channelFlow's Flow is considered complete once the suspend lambda returns. Since all you are doing is setting a listener and not waiting around, it will return almost immediately. When a channel Flow is completed, it's channel is also closed.

If you want your channelFlow to stay open until the Flow is canceled, call awaitClose() at the end. This function suspends until the channel is closed, so it will hold your Flow open until it's canceled or the event in your listener closes the Channel.

fun handleInput(myEmitter: MyEmitter) = channelFlow {
    myEmitter.listener = object:Listener {
        override fun onGetData(data: Int) { trySend(data) }
        override fun onClose() { close() }
    }
    awaitClose()
}

If you are familiar with callbackFlow, it is a specialized version of channelFlow and it enforces the awaitClose() call because it is meant for waiting for a listener, so there's no reason you would ever not want to await. It's also where you can deregister any listener you created inside the flow builder.

CodePudding user response:

To get this working, I did 3 things

  1. Add awaitClose to ensure the flow is not terminated
  2. Move the entire flow behind launch, so that it is not blocking the main() function flow
  3. Add a little delay before myEmitter.sendData(1), so that to ensure the launch get triggered first before doing the external sendData.

Full changed code as below

interface Listener {
    fun onGetData(data: Int)
    fun onClose()
}

class MyEmitter {
    var listener: Listener? = null
    fun sendData(data: Int) = listener?.onGetData(data)
    fun close() = listener?.onClose()
}

fun handleInput(myEmitter: MyEmitter) = channelFlow {
    myEmitter.listener = object:Listener {
        override fun onGetData(data: Int) { trySend(data) }
        override fun onClose() { close() }
    }
    awaitClose { myEmitter.listener = null } // Need awaitClose to keep the flow alive
}

fun main(): Unit = runBlocking {
    val myEmitter = MyEmitter()

    launch { // Need to run to avoid it from blocking the main() function flow due to having `awaitClose` there
        handleInput(myEmitter).collect {
            println(it)
        }
    }

    delay(100) // Add some delay to get this triggered after the launch run
    myEmitter.sendData(1)
    myEmitter.sendData(2)
    myEmitter.close()
}

The 3rd step is a little hack I think.

  • Related