Home > Software design >  Synchronize asyncio Queue
Synchronize asyncio Queue

Time:10-04

I am planning to have an asyncio Queue based producer-consumer implementation for a processing of realtime data where sending out data in correct time order is vital. So here is the code snippet of it :

async def produce(Q, n_jobs):
    for i in range(n_jobs):
        
        print(f"Producing :{i}")
        await Q.put(i)


async def consume(Q):
    while True:
        n = await Q.get()
        
        print(f"Consumed :{n}")
       
       x = do_sometask_and_return_the_result(n)
       print(f"Finished :{n} and Result: {x}")


async def main(loop):
    Q = asyncio.Queue(loop=loop, maxsize=3)
    await asyncio.wait([produce(Q, 10), consume(Q), consume(Q), consume(Q)])
    print("Done")

Here the producer produces data and puts it into the asyncio Queue. I have multiple consumers to consume and process the data. While seeing the outputs, the order is maintained while printing "Consumed :{n}" (as in 1,2,3,4... and so on) , this is completely fine. but, since the function do_sometask_and_return_the_result(n) takes variable time to return the result, the order is not maintained in the next print of n "Finished :{n}" (as in 2,1,4,3,5,...).

Is there any way to synchronize this data as I need to maintain the order of printing the results? I want to see 1,2,3,4,.. sequential prints for 'n' even after do_sometask_and_return_the_result(n).

CodePudding user response:

You could use a priority queue system (using the python heapq library) to reorder your jobs after they are complete. Something like this.

# add these variables at class/global scope
priority_queue = []
current_job_id = 1
job_id_dict = {}

async def produce(Q, n_jobs):
    # same as above

async def consume(Q):
    while True:
        n = await Q.get()
        
        print(f"Consumed :{n}")
       
       x = do_sometask_and_return_the_result(n)
       await process_result(n, x)


async def process_result(n, x):
    heappush(priority_queue, n)
    job_id_dict[n] = x
    while current_job_id == priority_queue[0]:
        job_id = heappop(priority_queue)
        print(f"Finished :{job_id} and Result: {job_id_dict[job_id]}")
        current_job_id  = 1
     


async def main(loop):
    Q = asyncio.Queue(loop=loop, maxsize=3)
    await asyncio.wait([produce(Q, 10), consume(Q), consume(Q), consume(Q)])
    print("Done")

For more information on the heapq module: https://docs.python.org/3/library/heapq.html

  • Related