Home > Software engineering >  How to run my code asynchronously when using asyncio queue.put() and queue.get() as part of task are
How to run my code asynchronously when using asyncio queue.put() and queue.get() as part of task are

Time:01-28

I am writing a code to simulate getting data from a website which takes 3-5seconds with multiple selenium drivers. I intend that it will get the batch data and go through the retrieve() function asynchronously to speed up the process. I am trying to use the asyncio queue to achieve this by using queue.put() and queue.get(). The result is incorrect part of the current_value 11 to 15 are somehow disappeared as if it did not run the while loop at all. Is there anything missing in my code or advice to improve my code?

import asyncio
import random
import time


async def retrieve(queue, lock_browser1,lock_browser2,lock_browser3,lock_browser4,lock_browser5): 

    while True:

        item = await queue.get()


        if lock_browser1.locked() is False:
            async with lock_browser1:   
                await asyncio.sleep(random.uniform(3, 5)) #simulate only one browser connect to the website at a time
                print(item)
            break
        else:
            await asyncio.sleep(0)

        if lock_browser2.locked() is False:
            async with lock_browser2:   
                await asyncio.sleep(random.uniform(3, 5)) #simulate only one browser connect to the website at a time
                print(item)
            break
        else:
            await asyncio.sleep(0)
        
        if lock_browser3.locked() is False:
            async with lock_browser3:   
                await asyncio.sleep(random.uniform(3, 5)) #simulate only one browser connect to the website at a time
                print(item)
            break
        else:
            await asyncio.sleep(0)
        
        if lock_browser4.locked() is False:
            async with lock_browser4:   
                await asyncio.sleep(random.uniform(3, 5)) #simulate only one browser connect to the website at a time
                print(item)
            break
        else:
            await asyncio.sleep(0)

        if lock_browser5.locked() is False:
            async with lock_browser5:   
                await asyncio.sleep(random.uniform(3, 5)) #simulate only one browser connect to the website at a time
                print(item)
            break
        else:
            await asyncio.sleep(0)        
        
        if item is None:
            break


async def process(queue,x,y):
    start_value  = 10
    current_value = 0
    stop_value = 15    
    current_value = start_value
    while current_value <= stop_value:
        await queue.put([x,y, current_value])          
        current_value = current_value   1  
        await asyncio.sleep(0)

    await queue.put(None)


    
    
async def main(x,y):
    
    lock_browser1 = asyncio.Lock()
    lock_browser2 = asyncio.Lock()
    lock_browser3 = asyncio.Lock()
    lock_browser4 = asyncio.Lock()
    lock_browser5 = asyncio.Lock()    
    
    queue = asyncio.Queue()    
    await asyncio.gather(process(queue, x, y),retrieve(queue,lock_browser1,lock_browser2,lock_browser3,lock_browser4,lock_browser5))
  

if __name__ == "__main__":
    start = time.perf_counter()
    for x in range(2):   
        for y in range(5):
                asyncio.run(main(x,y))
    end = time.perf_counter()
    duration=end-start
    print(f'Time elapsed:{duration:.2f}s')

INCORRECT RESULT:

[0, 0, 10]
[0, 1, 10]
[0, 2, 10]
[0, 3, 10]
[0, 4, 10]
[1, 0, 10]
[1, 1, 10]
[1, 2, 10]
[1, 3, 10]
[1, 4, 10]

INTEND RESULT:

[0, 0, 10]
[0, 0, 11]
[0, 0, 12]
[0, 0, 13]
[0, 0, 14]
[0, 0, 15]
[0, 1, 10]
[0, 1, 11]
[0, 1, 12]
   ...
[1, 4, 13]
[1, 4, 14]
[1, 4, 15]

CodePudding user response:

TL;DR - remove the break from the if-statements and find another way not to run two or more of them, fix at the end.

The code really does what you tell it to do. Let's break it down into smaller pieces.

Part 1 - if __name__ ...

if __name__ == "__main__":
    start = time.perf_counter()
    for x in range(2):
        for y in range(5):
            asyncio.run(main(x, y))
    end = time.perf_counter()
    duration = end - start
    print(f'Time elapsed:{duration:.2f}s')

We have nested loops, the outer one (x) goes from 0-1 and the inner one (y) goes from 0-4, each iteration we call asyncio.run(main(x, y)), keep in mind calling asyncio.run one after another does not make it run concurrently - it will first call main(0, 0) - block everything else until it finishes, then move to the next call (i.e main(0, 1)).

Part 2 - The main function:

Following that, we called main(0, 0) and we move to the main function:

async def main(x, y):

    lock_browser1 = asyncio.Lock()
    lock_browser2 = asyncio.Lock()
    lock_browser3 = asyncio.Lock()
    lock_browser4 = asyncio.Lock()
    lock_browser5 = asyncio.Lock()

    queue = asyncio.Queue()
    await asyncio.gather(process(queue, x, y),
                         retrieve(queue, lock_browser1, lock_browser2, lock_browser3, lock_browser4, lock_browser5))

I will dismiss the locks part - we create a new queue - queue. And call asyncio.gather() - which in contrast to asyncio.run - does run these functions concurrently. Again, dismissing the retrieve for now. We call process, again, one at a time because our main is being called one at a time.

Part 3 - The process function

async def process(queue, x, y):
    start_value = 10
    stop_value = 15
    current_value = start_value
    while current_value <= stop_value:
        await queue.put([x, y, current_value])
        current_value = current_value   1
    await queue.put(None)

Here we iterate through values 10-15 with our original x, y values. We put the list inside the queue - and... that's it probably, nothing more than that.

Part 4 - The retrieve function

async def retrieve(queue, lock_browser1, lock_browser2, lock_browser3, lock_browser4, lock_browser5):
    while True:

        item = await queue.get()
        if lock_browser1.locked() is False:

            async with lock_browser1:
                await asyncio.sleep(random.uniform(0.2, 0.3))  # simulate only one browser connect to the website at a time
                print(item)
            break
        else:
            await asyncio.sleep(0)

I've not added the full function, because it is too long, and it is basically useless - as lock_browser1 is always False - meaning we do enter the first if-statement, and then break from the while True - making the other if-else-blocks basically redundant (note that).


Summary:

We "pinch" together two functions: process and retrieve and run them concurrently, the process function is executing a fast while loop that increments the last number to the list making it look like: [0, 0, 10], [0, 0, 11] ... while retrieve gets the elements in the queue, prints the first element, breaks from the while-loop and the whole process starts over with a clean fresh queue. So by removing the break from the if-else statements it should be alright.

I know it was a dig for a small change, I will add a TL;DR for it, but I wanted to point out some things I found important to mention in case you missed them in your code.

Fix

I would add a flag that is true if we have entered an if statement, and false otherwise (I've moved the check if item is not None to the head of the block, so we won't print None for no use):

async def retrieve(queue, lock_browser1, lock_browser2, lock_browser3, lock_browser4, lock_browser5):
    while True:
        has_entered_if = False
        item = await queue.get()

        if item is None:
            break

        if lock_browser1.locked() is False and not has_entered_if:
            has_entered_if = True
            async with lock_browser1:
                await asyncio.sleep(random.uniform(3, 5))  # simulate only one browser connect to the website at a time
                print(item)

        else:
            await asyncio.sleep(0)

        if lock_browser2.locked() is False and not has_entered_if:
            has_entered_if = True
            async with lock_browser2:
                await asyncio.sleep(random.uniform(3, 5))  # simulate only one browser connect to the website at a time
                print(item)
        else:
            await asyncio.sleep(0)

        if lock_browser3.locked() is False and not has_entered_if:
            has_entered_if = True
            async with lock_browser3:
                await asyncio.sleep(random.uniform(3, 5))  # simulate only one browser connect to the website at a time
                print(item)
        else:
            await asyncio.sleep(0)

        if lock_browser4.locked() is False and not has_entered_if:
            has_entered_if = True
            async with lock_browser4:
                await asyncio.sleep(random.uniform(3, 5))  # simulate only one browser connect to the website at a time
                print(item)
        else:
            await asyncio.sleep(0)

        if lock_browser5.locked() is False and not has_entered_if:
            async with lock_browser5:
                await asyncio.sleep(random.uniform(3, 5))  # simulate only one browser connect to the website at a time
                print(item)
        else:
            await asyncio.sleep(0)

Output:

[0, 0, 10]
[0, 0, 11]
[0, 0, 12]
[0, 0, 13]
[0, 0, 14]
[0, 0, 15]
[0, 1, 10]
[0, 1, 11]
[0, 1, 12]
...
[1, 4, 10]
[1, 4, 11]
[1, 4, 12]
[1, 4, 13]
[1, 4, 14]
[1, 4, 15]

CodePudding user response:

That retrieve function allows only one browser (Selenium driver) to run at a time.

A more reasonable implementation, to simulate only one request per browser at a time:

async def retrieve(queue, lock_browser1, lock_browser2, lock_browser3, lock_browser4, lock_browser5):
    async def _retrieve(lock_browser):
        while True:
            async with lock_browser:
                await asyncio.sleep(random.uniform(3, 5))  # simulate only one request per browser at a time
                item = await queue.get()
                if item is None:
                    await queue.put(None)  # allow others to exit
                    break
                print(item)

    await asyncio.gather(
        _retrieve(lock_browser1),
        _retrieve(lock_browser2),
        _retrieve(lock_browser3),
        _retrieve(lock_browser4),
        _retrieve(lock_browser5),
    )

The process function and a suitable process_main function:

async def process(queue, x, y):
    start_value = 10
    stop_value = 15
    current_value = start_value
    while current_value <= stop_value:
        await queue.put([x, y, current_value])
        current_value = current_value   1
        await asyncio.sleep(0)


async def process_main(queue):
    for x in range(2):
        for y in range(5):
            await process(queue, x, y)
    await queue.put(None)

The main function should hold the lock_browser instances, and the if __name__ == "__main__": block should only call the main function once:

async def main():
    lock_browser1 = asyncio.Lock()
    lock_browser2 = asyncio.Lock()
    lock_browser3 = asyncio.Lock()
    lock_browser4 = asyncio.Lock()
    lock_browser5 = asyncio.Lock()

    queue = asyncio.Queue()

    await asyncio.gather(
        process_main(queue),
        retrieve(queue, lock_browser1, lock_browser2, lock_browser3, lock_browser4, lock_browser5),
    )


if __name__ == "__main__":
    start = time.perf_counter()
    asyncio.run(main())
    end = time.perf_counter()
    duration = end - start
    print(f'Time elapsed: {duration:.2f}s')

This runs in about 53 seconds on my machine.

  • Related