Recently I have been doing a lot of network or IO bound operations and using threads helps speed up the code a lot. I noticed that I have been writing code like this over and over again:
threads = []
for machine, user, data in machine_list:
mythread = threading.Thread(target=get_info, args=(machine, user, data))
mythread.start()
threads.append(mythread)
for mythread in threads:
mythread.join()
This feels somewhat repetitive. It works, but I suspect there is likely a more "Pythonic" way to write this. Any suggestions?
CodePudding user response:
What you are looking for is multiprocessing.pool.ThreadPool
, which has the same semantics as multiprocessing.pool.Pool
, but uses threads instead of processes.
You can do what you are currently doing more concisely like this:
from multiprocessing.pool import ThreadPool
pool = ThreadPool() # optionally pass the number of threads in the pool
res = pool.starmap_async(get_info, machine_list)
res.wait()
This is not exactly equivalent to your code since ThreadPool
creates a fixed number of threads (by default equal to the number of available CPUs) and distributes work among them, but you can nonetheless pass the number you want (e.g. ThreadPool(len(machine_list))
) to create exactly one for each item in the list.
Then you can also create a function to easily do this multiple times:
def run_threads(func, arglist):
ThreadPool(len(arglist)).starmap_async(func, arglist).wait()
Note: .starmap_async()
is just one way to achieve this. There are multiple methods you can use. Take a look at the documentation for Pool
linked above and choose the one you prefer.
CodePudding user response:
In Python, there is an easy and simple approach to work with many threads.
from concurrent.futures import ThreadPoolExecutor
from time import sleep
from tqdm import tqdm
def do_something(item):
print(item)
sleep(1)
print(f"Finished {item}")
items = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
with ThreadPoolExecutor(max_workers=8) as executor:
for item in items:
executor.submit(do_something, item)
# with progressbar:
with ThreadPoolExecutor(max_workers=8) as executor:
list(tqdm(executor.map(do_something, items), total=len(items)))
print("finished")
Note: I tried others, but this is the only one that worked with Docker multi-threading on a single vCPU ( google cloud run environment )
CodePudding user response:
The pythonic way would probably be to use asyncio. The problem you have is exactly what asyncio
was designed for. The net result is broadly the same as using threading. However, instead of threads though, you have tasks. And when a task is blocked, the executor will switch to a different task. However, the program will be single-threaded and so avoid the overhead caused by the GIL when switching between threads.
import asyncio
async def get_info(machine, user, data):
# NB. async declaration
...
async def main():
tasks = [
asyncio.create_task(get_info(machine, user, data))
for machine, user, data in machine_list
]
done, _pending = await asyncio.wait(tasks)
# asyncio is more powerful in that it allows you to directly get results of tasks.
# This is unlike threading, where you must use some form of signalling
# (such as a queue) to get data back from a thread.
results = {}
for args, task in zip(machine_list, tasks):
result = await task # this gets the result immediately since you have
# already used asyncio.wait
results[args] = result
task: (await task) for task in tasks}
if __name__ == '__main__':
asyncio.run(main())
The problem with this approach is that you'll have to start using asyncio-aware libraries and rewrite your own code to be asyncio-aware. Though to get started you can use asyncio.to_thread()
. It will run the given function in a separate thread,
import asyncio
def get_info(machine, user, data):
# NB. no async declaration
...
async def main():
tasks = [
asyncio.to_thread(get_info, machine, user, data)
for machine, user, data in machine_list
]
done, _pending = asyncio.wait(tasks)
if __name__ == '__main__':
asyncio.run(main())
concurrent.futures
If you're heavily invested in the threading model and switching to asyncio would be too much work, and having to learn all the new concepts would be too much of a barrier, then you can use concurrent.futures
from concurrent.futures import ThreadPoolExecutor
def get_info(machine, user, data):
...
def get_info_helper(args)
machine, user, data = args
return get_info(machine, user, data)
with ThreadPoolExecutor() as executor:
results = list(executor.map(get_info_helper, machine_list))