I have some code similar to the following:
def worker(l:list):
...
really_big_list = [......]
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future = {executor.submit(worker, l[i:i 100]) for i in range(0, len(really_big_list), 10))
for future in concurrent.futures.as_completed(future_to_url):
data = future.result()
Note that the workers take slices of size 100 but i only goes up by 10 at a time. In this code, even though I am only using 5 workers, to calculate the inputs to each worker I'll need to use 10 times the size of really_big_list. This is simply too much memory for what I am doing. Is there a way to defer calculating l[i:100] until that worker is started?
CodePudding user response:
I suggest an alternative - use multiprocessing.Pool
and function .imap
or .imap_unordered
, e.g.:
from itertools import islice
from multiprocessing import Pool
from time import sleep
def worker(l: list):
sleep(1)
return sum(l)
# from https://docs.python.org/3.11/library/itertools.html
def batched(iterable, n):
"Batch data into tuples of length n. The last batch may be shorter."
# batched('ABCDEFG', 3) --> ABC DEF G
if n < 1:
raise ValueError("n must be at least one")
it = iter(iterable)
while batch := tuple(islice(it, n)):
yield batch
if __name__ == "__main__":
really_big_list = list(range(10_000))
with Pool(5) as pool:
# use `.imap` or `.imap_unordered` here
for result in pool.imap(worker, batched(really_big_list, 10)):
print(result)