I am writing a code to simulate getting data from a website which takes 3-5seconds with multiple selenium drivers. I intend that it will get the batch data and go through the retrieve()
function asynchronously to speed up the process. I am trying to use the asyncio queue to achieve this by using queue.put()
and queue.get()
. The result is incorrect part of the current_value
11 to 15 are somehow disappeared as if it did not run the while loop at all. Is there anything missing in my code or advice to improve my code?
import asyncio
import random
import time
async def retrieve(queue, lock_browser1,lock_browser2,lock_browser3,lock_browser4,lock_browser5):
while True:
item = await queue.get()
if lock_browser1.locked() is False:
async with lock_browser1:
await asyncio.sleep(random.uniform(3, 5)) #simulate only one browser connect to the website at a time
print(item)
break
else:
await asyncio.sleep(0)
if lock_browser2.locked() is False:
async with lock_browser2:
await asyncio.sleep(random.uniform(3, 5)) #simulate only one browser connect to the website at a time
print(item)
break
else:
await asyncio.sleep(0)
if lock_browser3.locked() is False:
async with lock_browser3:
await asyncio.sleep(random.uniform(3, 5)) #simulate only one browser connect to the website at a time
print(item)
break
else:
await asyncio.sleep(0)
if lock_browser4.locked() is False:
async with lock_browser4:
await asyncio.sleep(random.uniform(3, 5)) #simulate only one browser connect to the website at a time
print(item)
break
else:
await asyncio.sleep(0)
if lock_browser5.locked() is False:
async with lock_browser5:
await asyncio.sleep(random.uniform(3, 5)) #simulate only one browser connect to the website at a time
print(item)
break
else:
await asyncio.sleep(0)
if item is None:
break
async def process(queue,x,y):
start_value = 10
current_value = 0
stop_value = 15
current_value = start_value
while current_value <= stop_value:
await queue.put([x,y, current_value])
current_value = current_value 1
await asyncio.sleep(0)
await queue.put(None)
async def main(x,y):
lock_browser1 = asyncio.Lock()
lock_browser2 = asyncio.Lock()
lock_browser3 = asyncio.Lock()
lock_browser4 = asyncio.Lock()
lock_browser5 = asyncio.Lock()
queue = asyncio.Queue()
await asyncio.gather(process(queue, x, y),retrieve(queue,lock_browser1,lock_browser2,lock_browser3,lock_browser4,lock_browser5))
if __name__ == "__main__":
start = time.perf_counter()
for x in range(2):
for y in range(5):
asyncio.run(main(x,y))
end = time.perf_counter()
duration=end-start
print(f'Time elapsed:{duration:.2f}s')
INCORRECT RESULT:
[0, 0, 10]
[0, 1, 10]
[0, 2, 10]
[0, 3, 10]
[0, 4, 10]
[1, 0, 10]
[1, 1, 10]
[1, 2, 10]
[1, 3, 10]
[1, 4, 10]
INTEND RESULT:
[0, 0, 10]
[0, 0, 11]
[0, 0, 12]
[0, 0, 13]
[0, 0, 14]
[0, 0, 15]
[0, 1, 10]
[0, 1, 11]
[0, 1, 12]
...
[1, 4, 13]
[1, 4, 14]
[1, 4, 15]
CodePudding user response:
TL;DR - remove the break
from the if-statements and find another way not to run two or more of them, fix at the end.
The code really does what you tell it to do. Let's break it down into smaller pieces.
Part 1 - if __name__ ...
if __name__ == "__main__":
start = time.perf_counter()
for x in range(2):
for y in range(5):
asyncio.run(main(x, y))
end = time.perf_counter()
duration = end - start
print(f'Time elapsed:{duration:.2f}s')
We have nested loops, the outer one (x
) goes from 0-1
and the inner one (y
) goes from 0-4
, each iteration we call asyncio.run(main(x, y))
, keep in mind calling asyncio.run
one after another does not make it run concurrently - it will first call main(0, 0)
- block everything else until it finishes, then move to the next call (i.e main(0, 1)
).
Part 2 - The main
function:
Following that, we called main(0, 0)
and we move to the main
function:
async def main(x, y):
lock_browser1 = asyncio.Lock()
lock_browser2 = asyncio.Lock()
lock_browser3 = asyncio.Lock()
lock_browser4 = asyncio.Lock()
lock_browser5 = asyncio.Lock()
queue = asyncio.Queue()
await asyncio.gather(process(queue, x, y),
retrieve(queue, lock_browser1, lock_browser2, lock_browser3, lock_browser4, lock_browser5))
I will dismiss the locks part - we create a new queue - queue
. And call asyncio.gather()
- which in contrast to asyncio.run
- does run these functions concurrently. Again, dismissing the retrieve
for now. We call process
, again, one at a time because our main
is being called one at a time.
Part 3 - The process
function
async def process(queue, x, y):
start_value = 10
stop_value = 15
current_value = start_value
while current_value <= stop_value:
await queue.put([x, y, current_value])
current_value = current_value 1
await queue.put(None)
Here we iterate through values 10-15
with our original x, y
values. We put the list inside the queue - and... that's it probably, nothing more than that.
Part 4 - The retrieve
function
async def retrieve(queue, lock_browser1, lock_browser2, lock_browser3, lock_browser4, lock_browser5):
while True:
item = await queue.get()
if lock_browser1.locked() is False:
async with lock_browser1:
await asyncio.sleep(random.uniform(0.2, 0.3)) # simulate only one browser connect to the website at a time
print(item)
break
else:
await asyncio.sleep(0)
I've not added the full function, because it is too long, and it is basically useless - as lock_browser1
is always False
- meaning we do enter the first if-statement, and then break from the while True
- making the other if-else-blocks basically redundant (note that).
Summary:
We "pinch" together two functions: process
and retrieve
and run them concurrently, the process
function is executing a fast while
loop that increments the last number to the list making it look like: [0, 0, 10], [0, 0, 11] ...
while retrieve
gets the elements in the queue, prints the first element, breaks from the while-loop and the whole process starts over with a clean fresh queue. So by removing the break
from the if-else statements it should be alright.
I know it was a dig for a small change, I will add a TL;DR for it, but I wanted to point out some things I found important to mention in case you missed them in your code.
Fix
I would add a flag that is true if we have entered an if statement, and false otherwise (I've moved the check if item
is not None
to the head of the block, so we won't print None
for no use):
async def retrieve(queue, lock_browser1, lock_browser2, lock_browser3, lock_browser4, lock_browser5):
while True:
has_entered_if = False
item = await queue.get()
if item is None:
break
if lock_browser1.locked() is False and not has_entered_if:
has_entered_if = True
async with lock_browser1:
await asyncio.sleep(random.uniform(3, 5)) # simulate only one browser connect to the website at a time
print(item)
else:
await asyncio.sleep(0)
if lock_browser2.locked() is False and not has_entered_if:
has_entered_if = True
async with lock_browser2:
await asyncio.sleep(random.uniform(3, 5)) # simulate only one browser connect to the website at a time
print(item)
else:
await asyncio.sleep(0)
if lock_browser3.locked() is False and not has_entered_if:
has_entered_if = True
async with lock_browser3:
await asyncio.sleep(random.uniform(3, 5)) # simulate only one browser connect to the website at a time
print(item)
else:
await asyncio.sleep(0)
if lock_browser4.locked() is False and not has_entered_if:
has_entered_if = True
async with lock_browser4:
await asyncio.sleep(random.uniform(3, 5)) # simulate only one browser connect to the website at a time
print(item)
else:
await asyncio.sleep(0)
if lock_browser5.locked() is False and not has_entered_if:
async with lock_browser5:
await asyncio.sleep(random.uniform(3, 5)) # simulate only one browser connect to the website at a time
print(item)
else:
await asyncio.sleep(0)
Output:
[0, 0, 10]
[0, 0, 11]
[0, 0, 12]
[0, 0, 13]
[0, 0, 14]
[0, 0, 15]
[0, 1, 10]
[0, 1, 11]
[0, 1, 12]
...
[1, 4, 10]
[1, 4, 11]
[1, 4, 12]
[1, 4, 13]
[1, 4, 14]
[1, 4, 15]
CodePudding user response:
That retrieve
function allows only one browser (Selenium driver) to run at a time.
A more reasonable implementation, to simulate only one request per browser at a time:
async def retrieve(queue, lock_browser1, lock_browser2, lock_browser3, lock_browser4, lock_browser5):
async def _retrieve(lock_browser):
while True:
async with lock_browser:
await asyncio.sleep(random.uniform(3, 5)) # simulate only one request per browser at a time
item = await queue.get()
if item is None:
await queue.put(None) # allow others to exit
break
print(item)
await asyncio.gather(
_retrieve(lock_browser1),
_retrieve(lock_browser2),
_retrieve(lock_browser3),
_retrieve(lock_browser4),
_retrieve(lock_browser5),
)
The process
function and a suitable process_main
function:
async def process(queue, x, y):
start_value = 10
stop_value = 15
current_value = start_value
while current_value <= stop_value:
await queue.put([x, y, current_value])
current_value = current_value 1
await asyncio.sleep(0)
async def process_main(queue):
for x in range(2):
for y in range(5):
await process(queue, x, y)
await queue.put(None)
The main
function should hold the lock_browser
instances, and the if __name__ == "__main__":
block should only call the main
function once:
async def main():
lock_browser1 = asyncio.Lock()
lock_browser2 = asyncio.Lock()
lock_browser3 = asyncio.Lock()
lock_browser4 = asyncio.Lock()
lock_browser5 = asyncio.Lock()
queue = asyncio.Queue()
await asyncio.gather(
process_main(queue),
retrieve(queue, lock_browser1, lock_browser2, lock_browser3, lock_browser4, lock_browser5),
)
if __name__ == "__main__":
start = time.perf_counter()
asyncio.run(main())
end = time.perf_counter()
duration = end - start
print(f'Time elapsed: {duration:.2f}s')
This runs in about 53 seconds on my machine.