Home > Enterprise >  concurrent.futures.Executor.map: calculate function inputs lazily
concurrent.futures.Executor.map: calculate function inputs lazily

Time:11-06

I have some code similar to the following:

def worker(l:list):
   ...
really_big_list = [......]
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future = {executor.submit(worker, l[i:i 100]) for i in range(0, len(really_big_list), 10))
    for future in concurrent.futures.as_completed(future_to_url):
        data = future.result()

Note that the workers take slices of size 100 but i only goes up by 10 at a time. In this code, even though I am only using 5 workers, to calculate the inputs to each worker I'll need to use 10 times the size of really_big_list. This is simply too much memory for what I am doing. Is there a way to defer calculating l[i:100] until that worker is started?

CodePudding user response:

I suggest an alternative - use multiprocessing.Pool and function .imap or .imap_unordered, e.g.:

from itertools import islice
from multiprocessing import Pool
from time import sleep


def worker(l: list):
    sleep(1)
    return sum(l)


# from https://docs.python.org/3.11/library/itertools.html
def batched(iterable, n):
    "Batch data into tuples of length n. The last batch may be shorter."
    # batched('ABCDEFG', 3) --> ABC DEF G
    if n < 1:
        raise ValueError("n must be at least one")
    it = iter(iterable)
    while batch := tuple(islice(it, n)):
        yield batch


if __name__ == "__main__":
    really_big_list = list(range(10_000))
    with Pool(5) as pool:
        # use `.imap` or `.imap_unordered` here
        for result in pool.imap(worker, batched(really_big_list, 10)):
            print(result)
  • Related