Home > Software engineering >  multiprocessing.Pool sharing large lists of lists read-only in memory across child process
multiprocessing.Pool sharing large lists of lists read-only in memory across child process

Time:12-24

I'm strungling with this problem.

I have a big list of lists that I want to acess with parallel code to perform CPU intensive operations. In order to do that i'm trying to use multiprocessing.Pool, the problem is that I also need to see this massive list of lists across my child process.

As the 'list of lists' is not regular (ex: [[1, 2], [1, 2, 3]]) I can't store them as a mp.Array, and as previouslly said, I'm not using mp.Process so I didin't figure out a way of using mp.Manager on this task. It's important to me to keep this list of lists because i'm applyng a function that querys based on indexes using from operator import itemgetter.

Here is a fictitious example of what i'm trying to achive:

import multiprocessing as mp
from operator import itemgetter
import numpy as np

def foo(indexes):
    # here I must guarantee read acess for big_list_of_lists on every child process somehow
    # as this code would work with only with one child process using global variables but would fail
    # with larger data.
    store_tuples = itemgetter(*indexes)(big_list_of_lists)
    return np.mean([item for sublista in store_tuples for item in sublista])

def main():
    # big_list_of_lists is the varible that I want to share across my child process
    big_list_of_lists = [[1, 3], [3, 1, 3], [1, 2], [2, 0]]

    ctx = mp.get_context('spawn')
    # big_list_of_lists elements are also passed as args
    pool = mp.Pool(ctx.Semaphore(mp.cpu_count()).get_value())
    res=list(pool.map(foo, big_list_of_lists))
    pool.close()
    pool.join()

    return res

if __name__ is '__main__':
    print(main())
# desired output is equivalente to:
# a = []
# for i in big_list_of_lists:
#     store_tuples = itemgetter(*i)(big_list_of_lists)
#     a.append(np.mean([item for sublista in store_tuples for item in sublista]))
# 'a' would be equal to [1.8, 1.5714285714285714, 2.0, 1.75]

other details: solution preferably should be achived using python 3.6 and must work on windows

Thank you very much!

CodePudding user response:

It seems to work fine for me using mp.Manager, with an mp.Manager.list of mp.Manager.lists. I believe this will not copy the lists to every process.

The important line is:

big_list_of_lists_proxy = manager.list([manager.list(sublist) for sublist in big_list_of_lists])

You may want to use instead, depending on your use case:

big_list_of_lists_proxy = manager.list(big_list_of_lists)

Whether every sublist should be a proxy or not depends on whether each sublist is large, and also whether it is read in its entirety. If a sublist is large, then it is expensive to transfer the list object to each process that needs it (O(n) complexity) and so a proxy list from a manager should be used, however if every element is going to be needed anyway, there is no advantage to using a proxy.

import multiprocessing as mp
from operator import itemgetter
import numpy as np
from functools import partial


def foo(indexes, big_list_of_lists):
    # here I must guarantee read acess for big_list_of_lists on every child process somehow
    # as this code would work with only with one child process using global variables but would fail
    # with larger data.
    store_tuples = itemgetter(*indexes)(big_list_of_lists)
    return np.mean([item for sublista in store_tuples for item in sublista])


def main():
    # big_list_of_lists is the varible that I want to share across my child process
    big_list_of_lists = [[1, 3], [3, 1, 3], [1, 2], [2, 0]]
    ctx = mp.get_context('spawn')
    with ctx.Manager() as manager:
        big_list_of_lists_proxy = manager.list([manager.list(sublist) for sublist in big_list_of_lists])
        # big_list_of_lists elements are also passed as args
        pool = ctx.Pool(ctx.Semaphore(mp.cpu_count()).get_value())
        res = list(pool.map(partial(foo, big_list_of_lists=big_list_of_lists_proxy), big_list_of_lists))
        pool.close()
        pool.join()

    return res


if __name__ == '__main__':
    print(main())
# desired output is equivalente to:
# a = []
# for i in big_list_of_lists:
#     store_tuples = itemgetter(*i)(big_list_of_lists)
#     a.append(np.mean([item for sublista in store_tuples for item in sublista]))
# 'a' would be equal to [1.8, 1.5714285714285714, 2.0, 1.75]
  • Related