I have difficulties writing an UDP message receive loop for Android.
In the following code, in receiveLoop
, the call to receiveMessages
never returns and I therefore never enter the message treatment loop.
Note that I am still able to receive packets, but it stops when the channel buffer is full.
I would expect receiveMessages
to return immediately, while the blocking IO loop inside it would still run forever.
class MySocketUDP(private val params: SocketParams) {
private val rcvSocket: DatagramSocket by lazy {
val sock = DatagramSocket(params.rcvPort)
sock.reuseAddress = true
sock.soTimeout = 1000
sock
}
suspend fun receiveMessages(channel: SendChannel<Message>) {
withContext(Dispatchers.IO) {
val buf = ByteArray(MAX_MSG_SIZE)
while (true) {
val pkt = DatagramPacket(buf, buf.size)
try {
if (channel.isClosedForSend) {
break
}
rcvSocket.receive(pkt)
val msg = packetToMessage(buf, 0, pkt.length)
Log.d("SOCKET", "filling channel with $msg")
channel.send(msg)
} catch (ex: SocketTimeoutException) {
} catch (ex: CancellationException) {
break
}
}
}
}
}
class MyModel {
private suspend fun receiveLoop(socket: MySocketUDP) {
withContext(Dispatchers.Main) {
val channel = Channel<Message>(16)
socket.receiveMessages(channel)
Log.d("MODEL", "Entering msg loop")
for (msg in channel) {
dispatchRcvMessage(msg)
}
}
}
}
- Why does
receiveMessages
never return while it is running in theIO
dispatcher and called from theMain
dispatcher? - Do I need to actually spawn a thread to such producer/consumer work?
- Can you show how to achieve such long blocking code nicely in a "coroutine-friendly" manner?
Thank you
CodePudding user response:
receiveMessages()
is a suspend function which calls another suspend functionwithContext()
, which in turn has an infinite loop. So callingsocket.receiveMessages(channel)
will suspend code execution while the loop is not finished.You need to launch separate coroutines for consumer and producer, e.g. using
launch
function.Some example of using coroutines:
val someScope = CoroutineScope(Dispatchers.Main) private suspend fun receiveLoop(socket: MySocketUDP) = someScope.launch { val channel = Channel<Message>(16) socket.receiveMessages(channel) Log.d("MODEL", "Entering msg loop") for (msg in channel) { dispatchRcvMessage(msg) } } // In MySocketUDP suspend fun receiveMessages(channel: SendChannel<Message>) { someAnotherScope.launch { // or can use coroutineScope builder function val buf = ByteArray(MAX_MSG_SIZE) while (true) { val pkt = DatagramPacket(buf, buf.size) try { if (channel.isClosedForSend) { break } rcvSocket.receive(pkt) val msg = packetToMessage(buf, 0, pkt.length) Log.d("SOCKET", "filling channel with $msg") channel.send(msg) } catch (ex: SocketTimeoutException) { } catch (ex: CancellationException) { break } } } }