Home > Enterprise >  How does async.queue synchronization works?
How does async.queue synchronization works?

Time:11-23

I have to build an application where my computer receives information from different serial ports.

My plan is to use one thread per port to read the data and another common to all to parse and save. Communication between threads is done through a async.queue but I have a problem with my implementation.

I have made a simple example with a single read thread and when I tried it I found a problem in my implementation.

import asyncio
import serial
from enum import Enum

EOL = b'\x17\x00'

class IdMessage(Enum):
    ACK = 0xa0
    PLS = 0xa1

async def read(port: serial.Serial, queue: asyncio.Queue):
    print('Reading')
    while True:
        if port.in_waiting > 0:
            data = port.read_until(EOL)
            id, code, *load, end_b1, end_b2 = data
            print("[MSG]", hex(id), hex(code), [*map(chr, load)], sep = ', ')
            opcode = IdMessage(code)
            if opcode is IdMessage.ACK:
                print(f'Device ID: {id}')
            elif opcode is IdMessage.PLS:
                print("Put:", load)
                await queue.put(load)
            else:
                print('Error')

async def save_data(queue: asyncio.Queue):
    print('Saving data.')
    while True:
        data = await queue.get()
        queue.task_done()
        print('Get:', data)
        n, a_msb, a_lsb, *_  = data
        a = (a_msb << 8) | a_lsb
        with open('out.csv', 'a') as fdata:
            print(n, a, sep=',', file=fdata)

async def main():
    queue = asyncio.Queue()
    port = serial.Serial('/dev/ttyACM0', baudrate=115200)
    await asyncio.sleep(2)
    print('Sending Information.')
    port.write(bytearray([0x01]))
    await asyncio.sleep(0.01)
    t1 = asyncio.create_task(read(port, queue))
    t2 = asyncio.create_task(save_data(queue))

    await asyncio.gather(t1, t2)

if __name__ == '__main__':
    asyncio.run(main())

Only read is executed. But adding:

async def read(port: serial.Serial, queue: asyncio.Queue):
    print('Reading')
    while True:
        if port.in_waiting > 0:
            data = port.read_until(EOL)
            id, code, *load, end_b1, end_b2 = data
            print("[MSG]", hex(id), hex(code), [*map(chr, load)], sep = ', ')
            opcode = IdMessage(code)
            if opcode is IdMessage.ACK:
                print(f'Device ID: {id}')
            elif opcode is IdMessage.PLS:
                print("Put:", load)
                await queue.put(load)
            else:
                print('Error')
        await queue.join() ########## This
        # await asyncio.sleep(0.001) <- This works too

Everything works correctly. Why do I have to add an await for the other thread to work? They are not concurrent? Could it be a problem with how the queue is synced?

CodePudding user response:

Your basic idea of using one thread per serial port is a possible approach. However, in your test program, the main thread does nothing but write the data to a file. It does not share execution with a second Task, so there is no need for asyncio. If your real program is indeed that simple, you don't need asyncio at all - just an ordinary multi-threaded program will do the job. But if the program needs to do something else in the main thread, like interact with the user, then a hybrid design with an asyncio main thread and a bunch of secondary threads might be the right solution. Since that was your question, I will describe how that can be done.

Since the function you call to read data from the port ( data = port.read_until(EOL)) is a blocking I/O call, it cannot effectively share a thread with asyncio. The test program you wrote doesn't work because read_until blocks the main thread until data appears at the port, preventing the asyncio event loop from running. You put this call into a tight loop (while True: in read), so it has the effect of blocking all (or almost all) the activity in the other asyncio Tasks. It's true that the event loop will run briefly when it hits await queue.put(load), but the next time the read task gets control it will block again.

You cannot use a single-threaded version of the program to develop and understand this. You've got to tackle the multi-threading problem right up front. A simple rule of thumb: don't mix blocking I/O calls with asyncio in the same thread. But asyncio has methods to handle the multithreading issues.

Step 1. Convert read to a thread, not a Task. Pass the event loop to it as well as the queue. Use a threadsafe way of putting an item into the queue (see comment). Note that the function queue.put_nowait now runs in the main thread.

def read(port: serial.Serial, queue: asyncio.Queue, loop):
    print('Reading')
    while True:
        if port.in_waiting > 0:
            data = port.read_until(EOL)
            id, code, *load, end_b1, end_b2 = data
            print("[MSG]", hex(id), hex(code), [*map(chr, load)], sep = ', ')
            opcode = IdMessage(code)
            if opcode is IdMessage.ACK:
                print(f'Device ID: {id}')
            elif opcode is IdMessage.PLS:
                print("Put:", load)
                loop.call_soon_threadsafe(queue.put_nowait, load)  # Change this line
            else:
                print('Error')

Step 2. Now modify main appropriately. See the inline comments.

async def main():
    queue = asyncio.Queue()
    port = serial.Serial('/dev/ttyACM0', baudrate=115200)
    await asyncio.sleep(2)  # not necessary in my experience
    print('Sending Information.')
    port.write(bytearray([0x01]))  # blocking, but fast.  So OK
    await asyncio.sleep(0.01)
    # CHANGE HERE - Launch the read thread
    t1 = threading.Thread(target=read, args=(port, queue, asyncio.get_event_loop()))
    t1.start()
    await save_data(queue)  # CHANGE HERE - no need for gather

I can't test the script so please let me know if I've made a mistake.

I don't see a need to change save_data.

CodePudding user response:

asyncio.Queue is for communication between coroutines that run in the same thread.

To communicate between different threads, as in your use case, use threading.Queue.

  • Related