Home > Software design >  suspend IO function never return
suspend IO function never return

Time:01-02

I have difficulties writing an UDP message receive loop for Android.

In the following code, in receiveLoop, the call to receiveMessages never returns and I therefore never enter the message treatment loop. Note that I am still able to receive packets, but it stops when the channel buffer is full.

I would expect receiveMessages to return immediately, while the blocking IO loop inside it would still run forever.

class MySocketUDP(private val params: SocketParams) {


    private val rcvSocket: DatagramSocket by lazy {
        val sock = DatagramSocket(params.rcvPort)
        sock.reuseAddress = true
        sock.soTimeout = 1000
        sock
    }

    suspend fun receiveMessages(channel: SendChannel<Message>) {
        withContext(Dispatchers.IO) {
            val buf = ByteArray(MAX_MSG_SIZE)
            while (true) {
                val pkt = DatagramPacket(buf, buf.size)
                try {
                    if (channel.isClosedForSend) {
                        break
                    }
                    rcvSocket.receive(pkt)
                    val msg = packetToMessage(buf, 0, pkt.length)
                    Log.d("SOCKET", "filling channel with $msg")
                    channel.send(msg)
                } catch (ex: SocketTimeoutException) {
                } catch (ex: CancellationException) {
                    break
                }
            }
        }
    }
}

class MyModel {

    private suspend fun receiveLoop(socket: MySocketUDP) {
        withContext(Dispatchers.Main) {
            val channel = Channel<Message>(16)
            socket.receiveMessages(channel)
            Log.d("MODEL", "Entering msg loop")
            for (msg in channel) {
                dispatchRcvMessage(msg)
            }
        }
    }

}

  1. Why does receiveMessages never return while it is running in the IO dispatcher and called from the Main dispatcher?
  2. Do I need to actually spawn a thread to such producer/consumer work?
  3. Can you show how to achieve such long blocking code nicely in a "coroutine-friendly" manner?

Thank you

CodePudding user response:

  1. receiveMessages() is a suspend function which calls another suspend function withContext(), which in turn has an infinite loop. So calling socket.receiveMessages(channel) will suspend code execution while the loop is not finished.

  2. You need to launch separate coroutines for consumer and producer, e.g. using launch function.

  3. Some example of using coroutines:

     val someScope = CoroutineScope(Dispatchers.Main)
    
     private suspend fun receiveLoop(socket: MySocketUDP) = someScope.launch {
         val channel = Channel<Message>(16)
         socket.receiveMessages(channel)
         Log.d("MODEL", "Entering msg loop")
         for (msg in channel) {
             dispatchRcvMessage(msg)
         }
     }
    
     // In MySocketUDP
     suspend fun receiveMessages(channel: SendChannel<Message>) {
         someAnotherScope.launch { // or can use coroutineScope builder function
             val buf = ByteArray(MAX_MSG_SIZE)
             while (true) {
                 val pkt = DatagramPacket(buf, buf.size)
                 try {
                     if (channel.isClosedForSend) {
                         break
                     }
                     rcvSocket.receive(pkt)
                     val msg = packetToMessage(buf, 0, pkt.length)
                     Log.d("SOCKET", "filling channel with $msg")
                     channel.send(msg)
                 } catch (ex: SocketTimeoutException) {
                 } catch (ex: CancellationException) {
                     break
                 }
             }
         }
     }
    
  • Related