Home > database >  Pythonic way to use threads
Pythonic way to use threads

Time:08-31

Recently I have been doing a lot of network or IO bound operations and using threads helps speed up the code a lot. I noticed that I have been writing code like this over and over again:

threads = []
for machine, user, data in machine_list:
  mythread = threading.Thread(target=get_info, args=(machine, user, data))
  mythread.start()
  threads.append(mythread)
  
for mythread in threads:
  mythread.join()

This feels somewhat repetitive. It works, but I suspect there is likely a more "Pythonic" way to write this. Any suggestions?

CodePudding user response:

What you are looking for is multiprocessing.pool.ThreadPool, which has the same semantics as multiprocessing.pool.Pool, but uses threads instead of processes.

You can do what you are currently doing more concisely like this:

from multiprocessing.pool import ThreadPool

pool = ThreadPool() # optionally pass the number of threads in the pool
res = pool.starmap_async(get_info, machine_list)
res.wait()

This is not exactly equivalent to your code since ThreadPool creates a fixed number of threads (by default equal to the number of available CPUs) and distributes work among them, but you can nonetheless pass the number you want (e.g. ThreadPool(len(machine_list))) to create exactly one for each item in the list.

Then you can also create a function to easily do this multiple times:

def run_threads(func, arglist):
    ThreadPool(len(arglist)).starmap_async(func, arglist).wait()

Note: .starmap_async() is just one way to achieve this. There are multiple methods you can use. Take a look at the documentation for Pool linked above and choose the one you prefer.

CodePudding user response:

In Python, there is an easy and simple approach to work with many threads.

from concurrent.futures import ThreadPoolExecutor
from time import sleep

from tqdm import tqdm


def do_something(item):
    print(item)
    sleep(1)
    print(f"Finished {item}")


items = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

with ThreadPoolExecutor(max_workers=8) as executor:
    for item in items:
        executor.submit(do_something, item)

# with progressbar:
with ThreadPoolExecutor(max_workers=8) as executor:
    list(tqdm(executor.map(do_something, items), total=len(items)))

print("finished")

Note: I tried others, but this is the only one that worked with Docker multi-threading on a single vCPU ( google cloud run environment )

CodePudding user response:

The pythonic way would probably be to use asyncio. The problem you have is exactly what asyncio was designed for. The net result is broadly the same as using threading. However, instead of threads though, you have tasks. And when a task is blocked, the executor will switch to a different task. However, the program will be single-threaded and so avoid the overhead caused by the GIL when switching between threads.

import asyncio

async def get_info(machine, user, data):
    # NB. async declaration
    ...

async def main():
    tasks = [
        asyncio.create_task(get_info(machine, user, data))
        for machine, user, data in machine_list
    ]
    done, _pending = await asyncio.wait(tasks)
    
    # asyncio is more powerful in that it allows you to directly get results of tasks.
    # This is unlike threading, where you must use some form of signalling 
    # (such as a queue) to get data back from a thread. 
    results = {}
    for args, task in zip(machine_list, tasks):
        result = await task  # this gets the result immediately since you have 
        # already used asyncio.wait
        results[args] = result
task: (await task) for task in tasks}

if __name__ == '__main__':
    asyncio.run(main())

The problem with this approach is that you'll have to start using asyncio-aware libraries and rewrite your own code to be asyncio-aware. Though to get started you can use asyncio.to_thread(). It will run the given function in a separate thread,

import asyncio

def get_info(machine, user, data):
    # NB. no async declaration
    ...

async def main():
    tasks = [
        asyncio.to_thread(get_info, machine, user, data)
       for machine, user, data in machine_list
    ]
    done, _pending = asyncio.wait(tasks)

if __name__ == '__main__':
    asyncio.run(main())

concurrent.futures

If you're heavily invested in the threading model and switching to asyncio would be too much work, and having to learn all the new concepts would be too much of a barrier, then you can use concurrent.futures

from concurrent.futures import ThreadPoolExecutor

def get_info(machine, user, data):
    ...

def get_info_helper(args)
    machine, user, data = args
    return get_info(machine, user, data)

with ThreadPoolExecutor() as executor:
    results = list(executor.map(get_info_helper, machine_list))
  • Related