I'm strungling with this problem.
I have a big list of lists that I want to acess with parallel code to perform CPU intensive operations. In order to do that i'm trying to use multiprocessing.Pool
, the problem is that I also need to see this massive list of lists across my child process.
As the 'list of lists' is not regular (ex: [[1, 2], [1, 2, 3]]
) I can't store them as a mp.Array
, and as previouslly said, I'm not using mp.Process
so I didin't figure out a way of using mp.Manager
on this task. It's important to me to keep this list of lists because i'm applyng a function that querys based on indexes using from operator import itemgetter
.
Here is a fictitious example of what i'm trying to achive:
import multiprocessing as mp
from operator import itemgetter
import numpy as np
def foo(indexes):
# here I must guarantee read acess for big_list_of_lists on every child process somehow
# as this code would work with only with one child process using global variables but would fail
# with larger data.
store_tuples = itemgetter(*indexes)(big_list_of_lists)
return np.mean([item for sublista in store_tuples for item in sublista])
def main():
# big_list_of_lists is the varible that I want to share across my child process
big_list_of_lists = [[1, 3], [3, 1, 3], [1, 2], [2, 0]]
ctx = mp.get_context('spawn')
# big_list_of_lists elements are also passed as args
pool = mp.Pool(ctx.Semaphore(mp.cpu_count()).get_value())
res=list(pool.map(foo, big_list_of_lists))
pool.close()
pool.join()
return res
if __name__ is '__main__':
print(main())
# desired output is equivalente to:
# a = []
# for i in big_list_of_lists:
# store_tuples = itemgetter(*i)(big_list_of_lists)
# a.append(np.mean([item for sublista in store_tuples for item in sublista]))
# 'a' would be equal to [1.8, 1.5714285714285714, 2.0, 1.75]
other details: solution preferably should be achived using python 3.6 and must work on windows
Thank you very much!
CodePudding user response:
It seems to work fine for me using mp.Manager
, with an mp.Manager.list
of mp.Manager.list
s. I believe this will not copy the lists to every process.
The important line is:
big_list_of_lists_proxy = manager.list([manager.list(sublist) for sublist in big_list_of_lists])
You may want to use instead, depending on your use case:
big_list_of_lists_proxy = manager.list(big_list_of_lists)
Whether every sublist should be a proxy or not depends on whether each sublist is large, and also whether it is read in its entirety. If a sublist is large, then it is expensive to transfer the list object to each process that needs it (O(n)
complexity) and so a proxy list from a manager should be used, however if every element is going to be needed anyway, there is no advantage to using a proxy.
import multiprocessing as mp
from operator import itemgetter
import numpy as np
from functools import partial
def foo(indexes, big_list_of_lists):
# here I must guarantee read acess for big_list_of_lists on every child process somehow
# as this code would work with only with one child process using global variables but would fail
# with larger data.
store_tuples = itemgetter(*indexes)(big_list_of_lists)
return np.mean([item for sublista in store_tuples for item in sublista])
def main():
# big_list_of_lists is the varible that I want to share across my child process
big_list_of_lists = [[1, 3], [3, 1, 3], [1, 2], [2, 0]]
ctx = mp.get_context('spawn')
with ctx.Manager() as manager:
big_list_of_lists_proxy = manager.list([manager.list(sublist) for sublist in big_list_of_lists])
# big_list_of_lists elements are also passed as args
pool = ctx.Pool(ctx.Semaphore(mp.cpu_count()).get_value())
res = list(pool.map(partial(foo, big_list_of_lists=big_list_of_lists_proxy), big_list_of_lists))
pool.close()
pool.join()
return res
if __name__ == '__main__':
print(main())
# desired output is equivalente to:
# a = []
# for i in big_list_of_lists:
# store_tuples = itemgetter(*i)(big_list_of_lists)
# a.append(np.mean([item for sublista in store_tuples for item in sublista]))
# 'a' would be equal to [1.8, 1.5714285714285714, 2.0, 1.75]