Home > Net >  Multithreading with async functions in Python, is this a bad practice?
Multithreading with async functions in Python, is this a bad practice?

Time:07-24

I want to implement this demo case:

  1. A 'tester' will run for a random period of time.
  2. The main function will hold at max concurrently three testers.
  3. A new tester will be run in a new thread after an old one has finished.
import asyncio
import threading
import random

########## config ############
halt_chance_range = 5  # Testers will halt if random.randrange(0, halt_chance_range) returns a 0
total_tester_number = 10
max_concurrent_testers = 3
########## config ############


########## global variables ############
running_tester_count = 0
history_tester_count = 0
########## global variables ############


async def random_halt(tester_index, used_time):
    """ use recursive structure to demonstrate how to call another async function """

    random_factor = random.randrange(0, halt_chance_range)
    if random_factor == 0 and used_time != 0 :
        print(f'tester.{tester_index} has stopped after {used_time} seconds.')
        return used_time

    print(f'tester.{tester_index} waiting .... {used_time}')
    await asyncio.sleep(1)
    return await random_halt(tester_index, used_time   1)

async def run_a_new_test(tester_index):
    """
     Wrap this async function in a new thread,
     so that the main function won't have to 'await' it.
    """
    global running_tester_count

    await random_halt( tester_index, 0 )
    running_tester_count -= 1

async def main():
    global running_tester_count
    global history_tester_count

    while history_tester_count < total_tester_number:
        # Create new tester only when there is still space, otherwise check again later
        if running_tester_count < max_concurrent_testers:
            running_tester_count  = 1
            history_tester_count  = 1
            _thread = threading.Thread(target=asyncio.run, args=(run_a_new_test(history_tester_count), ))
            print(f'        starting tester : {history_tester_count}   ')
            _thread.start()
        else:
            await asyncio.sleep(1)

    # Wait until all threads are finished
    while running_tester_count > 0:
            await asyncio.sleep(1)

if __name__ == '__main__':
    asyncio.run(main())
    print("All Done.")

This program seems to successfully achieve my goal, here is an output :

        starting tester : 1
        starting tester : 2
        starting tester : 3
tester.1 waiting .... 0
tester.2 waiting .... 0
tester.3 waiting .... 0
tester.3 has stopped after 1 seconds.
tester.2 waiting .... 1
tester.1 waiting .... 1
        starting tester : 4
tester.4 waiting .... 0
tester.1 waiting .... 2
tester.2 waiting .... 2
tester.4 has stopped after 1 seconds.
        starting tester : 5
tester.1 waiting .... 3
tester.2 waiting .... 3
tester.5 waiting .... 0
tester.1 waiting .... 4
tester.2 waiting .... 4
tester.5 has stopped after 1 seconds.
tester.2 waiting .... 5
        starting tester : 6
tester.1 waiting .... 5
tester.6 waiting .... 0
tester.2 waiting .... 6
tester.6 waiting .... 1
tester.1 waiting .... 6
tester.2 waiting .... 7
tester.6 has stopped after 2 seconds.
tester.1 waiting .... 7
tester.2 has stopped after 8 seconds.
        starting tester : 7
tester.1 has stopped after 8 seconds.
        starting tester : 8
        starting tester : 9
tester.7 waiting .... 0
tester.9 waiting .... 0
tester.8 waiting .... 0
tester.9 waiting .... 1
tester.7 waiting .... 1
tester.8 has stopped after 1 seconds.
        starting tester : 10
tester.9 waiting .... 2
tester.10 waiting .... 0
tester.7 has stopped after 2 seconds.
tester.9 waiting .... 3
tester.10 waiting .... 1
tester.9 waiting .... 4
tester.10 waiting .... 2
tester.9 waiting .... 5
tester.10 waiting .... 3
tester.9 waiting .... 6
tester.10 has stopped after 4 seconds.
tester.9 waiting .... 7
tester.9 has stopped after 8 seconds.
All Done.

I want to ask : is this combination of thread and async a bad practice?

Or simply put, is it OK for me to use it with some more complicated functions?

Thank you so much.

CodePudding user response:

I don't imagine the world will fall out from under your feet (although I can't guarantee it won't). But it does seem a little odd to mix the two paradigms together. It is also worth noting that threads come with considerably more overhead than fully-async cooperative multitasking.

Now, assuming you're IO-bound and all you want to achieve is single-processor concurrency, asyncio has everything you need.

There are a couple of options here in the form of asyncio.create_task() and asyncio.gather().

To illustrate using create_task(), here's a variation of your main() function:

async def main():
    global running_tester_count
    global history_tester_count

    tasks = set()

    while history_tester_count < total_tester_number:
        # Create new tester only when there is still space, otherwise check again later
        if running_tester_count < max_concurrent_testers:
            running_tester_count  = 1
            history_tester_count  = 1
            task = asyncio.create_task(run_a_new_test(history_tester_count))
            tasks.add(task)
            task.add_done_callback(lambda t: (print(t), tasks.discard(t)))
            print(f'        starting tester : {history_tester_count}   ')
        else:
            await asyncio.sleep(1)

    # Wait until all tasks are finished
    while tasks:
        await asyncio.sleep(1)

This should achieve the same, but without introducing threads.

  • Related