I want to implement this demo case:
- A 'tester' will run for a random period of time.
- The main function will hold at max concurrently three testers.
- A new tester will be run in a new thread after an old one has finished.
import asyncio
import threading
import random
########## config ############
halt_chance_range = 5 # Testers will halt if random.randrange(0, halt_chance_range) returns a 0
total_tester_number = 10
max_concurrent_testers = 3
########## config ############
########## global variables ############
running_tester_count = 0
history_tester_count = 0
########## global variables ############
async def random_halt(tester_index, used_time):
""" use recursive structure to demonstrate how to call another async function """
random_factor = random.randrange(0, halt_chance_range)
if random_factor == 0 and used_time != 0 :
print(f'tester.{tester_index} has stopped after {used_time} seconds.')
return used_time
print(f'tester.{tester_index} waiting .... {used_time}')
await asyncio.sleep(1)
return await random_halt(tester_index, used_time 1)
async def run_a_new_test(tester_index):
"""
Wrap this async function in a new thread,
so that the main function won't have to 'await' it.
"""
global running_tester_count
await random_halt( tester_index, 0 )
running_tester_count -= 1
async def main():
global running_tester_count
global history_tester_count
while history_tester_count < total_tester_number:
# Create new tester only when there is still space, otherwise check again later
if running_tester_count < max_concurrent_testers:
running_tester_count = 1
history_tester_count = 1
_thread = threading.Thread(target=asyncio.run, args=(run_a_new_test(history_tester_count), ))
print(f' starting tester : {history_tester_count} ')
_thread.start()
else:
await asyncio.sleep(1)
# Wait until all threads are finished
while running_tester_count > 0:
await asyncio.sleep(1)
if __name__ == '__main__':
asyncio.run(main())
print("All Done.")
This program seems to successfully achieve my goal, here is an output :
starting tester : 1
starting tester : 2
starting tester : 3
tester.1 waiting .... 0
tester.2 waiting .... 0
tester.3 waiting .... 0
tester.3 has stopped after 1 seconds.
tester.2 waiting .... 1
tester.1 waiting .... 1
starting tester : 4
tester.4 waiting .... 0
tester.1 waiting .... 2
tester.2 waiting .... 2
tester.4 has stopped after 1 seconds.
starting tester : 5
tester.1 waiting .... 3
tester.2 waiting .... 3
tester.5 waiting .... 0
tester.1 waiting .... 4
tester.2 waiting .... 4
tester.5 has stopped after 1 seconds.
tester.2 waiting .... 5
starting tester : 6
tester.1 waiting .... 5
tester.6 waiting .... 0
tester.2 waiting .... 6
tester.6 waiting .... 1
tester.1 waiting .... 6
tester.2 waiting .... 7
tester.6 has stopped after 2 seconds.
tester.1 waiting .... 7
tester.2 has stopped after 8 seconds.
starting tester : 7
tester.1 has stopped after 8 seconds.
starting tester : 8
starting tester : 9
tester.7 waiting .... 0
tester.9 waiting .... 0
tester.8 waiting .... 0
tester.9 waiting .... 1
tester.7 waiting .... 1
tester.8 has stopped after 1 seconds.
starting tester : 10
tester.9 waiting .... 2
tester.10 waiting .... 0
tester.7 has stopped after 2 seconds.
tester.9 waiting .... 3
tester.10 waiting .... 1
tester.9 waiting .... 4
tester.10 waiting .... 2
tester.9 waiting .... 5
tester.10 waiting .... 3
tester.9 waiting .... 6
tester.10 has stopped after 4 seconds.
tester.9 waiting .... 7
tester.9 has stopped after 8 seconds.
All Done.
I want to ask : is this combination of thread and async a bad practice?
Or simply put, is it OK for me to use it with some more complicated functions?
Thank you so much.
CodePudding user response:
I don't imagine the world will fall out from under your feet (although I can't guarantee it won't). But it does seem a little odd to mix the two paradigms together. It is also worth noting that threads come with considerably more overhead than fully-async cooperative multitasking.
Now, assuming you're IO-bound and all you want to achieve is single-processor concurrency, asyncio
has everything you need.
There are a couple of options here in the form of asyncio.create_task()
and asyncio.gather()
.
To illustrate using create_task()
, here's a variation of your main()
function:
async def main():
global running_tester_count
global history_tester_count
tasks = set()
while history_tester_count < total_tester_number:
# Create new tester only when there is still space, otherwise check again later
if running_tester_count < max_concurrent_testers:
running_tester_count = 1
history_tester_count = 1
task = asyncio.create_task(run_a_new_test(history_tester_count))
tasks.add(task)
task.add_done_callback(lambda t: (print(t), tasks.discard(t)))
print(f' starting tester : {history_tester_count} ')
else:
await asyncio.sleep(1)
# Wait until all tasks are finished
while tasks:
await asyncio.sleep(1)
This should achieve the same, but without introducing threads.