Some questions have looked at non-nested defaultdict
behavior when multiprocessing:
Using defaultdict with multiprocessing?
Python defaultdict behavior possible with multiprocessing?
and it seems that managing something nested like defaultdict(list)
isn't an entirely simple process, let alone something more complex like defaultdict(lambda: defaultdict(list))
import concurrent.futures
from collections import defaultdict
import multiprocessing as mp
from multiprocessing.managers import BaseManager, DictProxy, ListProxy
import numpy as np
def called_function1(hey, i, yo):
yo[i].append(hey)
class EmbeddedManager(BaseManager):
pass
def func1():
emanager = EmbeddedManager()
emanager.register('defaultdict', defaultdict, DictProxy)
emanager.start()
ddict = emanager.defaultdict(list)
with concurrent.futures.ProcessPoolExecutor(8) as executor:
for i in range(10):
ind = np.random.randint(2)
executor.submit(called_function1, i, ind, ddict)
for k, v in ddict.items():
print(k, v)
emanager.shutdown()
trying to register a normal defaultdict
will fail for the contents inside it, as they aren't being managed, and only the keys are retained:
func1()
1 []
0 []
a different approach i tried was to add a list within the function, which would be a reasonable compromise
def called_function2(hey, i, yo):
if i not in yo:
yo[i] = []
yo[i].append(hey)
def func2():
manager = mp.Manager()
ddict = manager.dict()
with concurrent.futures.ProcessPoolExecutor(8) as executor:
for i in range(10):
ind = np.random.randint(2)
executor.submit(called_function2, i, ind, ddict)
for k, v in ddict.items():
print(k, v)
but it still isn't being managed
func2()
1 []
0 []
I can get this to work by forcing a managed list inside a dictionary before the function is called
def called_function3(hey, i, yo):
yo[i].append(hey)
def func3():
manager = mp.Manager()
ddict = manager.dict()
with concurrent.futures.ProcessPoolExecutor(8) as executor:
for i in range(10):
ind = np.random.randint(2)
if ind not in ddict:
ddict[ind] = manager.list()
executor.submit(called_function2, i, ind, ddict)
for k, v in ddict.items():
print(k, v)
But I wouldn't prefer this method because i don't necessarily know if I need this dictionary key to even exist before the function is ran
func3()
0 [0, 2, 3, 4, 6, 8]
1 [1, 5, 7, 9]
trying to pass the manager to the function so it can create a managed list on the fly doesn't work
def called_function4(hey, i, yo, man):
if i not in yo:
yo[i] = man.list()
yo[i].append(hey)
def func4():
manager = mp.Manager()
ddict = manager.dict()
with concurrent.futures.ProcessPoolExecutor(8) as executor:
futures = []
for i in range(10):
ind = np.random.randint(2)
futures.append(executor.submit(called_function2, i, ind, ddict, manager))
for f in concurrent.futures.as_completed(futures):
print(f.result())
for k, v in ddict.items():
print(k, v)
func4()
TypeError: Pickling an AuthenticationString object is disallowed for security reasons
and trying to create a new manager within the called function
def called_function5(hey, i, yo):
if i not in yo:
yo[i] = mp.Manager().list()
yo[i].append(hey)
def func5():
manager = mp.Manager()
ddict = manager.dict()
with concurrent.futures.ProcessPoolExecutor(8) as executor:
futures = []
for i in range(10):
ind = np.random.randint(2)
futures.append(executor.submit(called_function5, i, ind, ddict))
for f in concurrent.futures.as_completed(futures):
print(f.result())
for k, v in ddict.items():
print(k, v)
raises another error
func5()
BrokenPipeError: [Errno 32] Broken pipe
is there any better way of doing this?
CodePudding user response:
the most straight forward solution is to create a manager for lists and dictionaries and send its address to the children, this way the children can connect to it and request it creates the objects.
import concurrent.futures
import numpy as np
import threading
from multiprocessing.managers import BaseManager, DictProxy, ListProxy, AcquirerProxy
def called_function2(hey, i, yo, manager_adress, creation_lock):
manager = EmbeddedManager(address=manager_adress)
manager.connect()
if i not in yo: # to avoid having to enter the lock if the item already exists
with creation_lock:
if i not in yo: # two processes passed the first if on the same key.
yo[i] = manager.list()
yo[i].append(hey)
class EmbeddedManager(BaseManager):
pass
EmbeddedManager.register("dict", dict, DictProxy)
EmbeddedManager.register("list", list, ListProxy)
EmbeddedManager.register("Lock", threading.Lock, AcquirerProxy)
def func2():
manager = EmbeddedManager()
manager.start()
ddict = manager.dict()
creation_lock = manager.Lock()
with concurrent.futures.ProcessPoolExecutor(8) as executor:
results = []
for i in range(10):
ind = np.random.randint(2)
res = executor.submit(called_function2, i, ind, ddict, manager.address, creation_lock)
results.append(res)
for item in results:
item.result()
for k, v in ddict.items():
print(k, v)
if __name__ == "__main__":
func2()
note that the lock is required to avoid race condition when creating the lists,
Edit: a less straightforward way that avoids having to poll the process for the key existance involves a lot of behavior rewriting, so basically you subclass dict, and its proxy to allow you to set the manager address in it, then you override its __getitem__
to cause it to look up the key or create the object if it doesn't exist .... and since managers are servicing calls with threads you'll need a threadlock to avoid race conditions.
import concurrent.futures
import numpy as np
from multiprocessing.managers import BaseManager, DictProxy, ListProxy
import threading
lock = threading.Lock()
def called_function2(hey, i, yo):
yo[i].append(hey)
class EmbeddedManager(BaseManager):
pass
class mydictProxy(DictProxy):
_exposed_ = ("set_manager_address",) DictProxy._exposed_
def set_manager_adress(self, manager_address):
self._callmethod("set_manager_address",(manager_address,))
class mydict(dict):
def __init__(self):
super().__init__()
self._manager_adress = None
def set_manager_address(self, manager_address):
self._manager_adress = manager_address
def __getitem__(self, item):
if item in self:
return super().__getitem__(item)
else:
with lock:
if item in self:
return super().__getitem__(item)
manager = EmbeddedManager(address=self._manager_adress)
manager.connect()
lst = manager.list()
super().__setitem__(item,lst)
return lst
EmbeddedManager.register("mydict", mydict, mydictProxy)
EmbeddedManager.register("list", list, ListProxy)
def func2():
manager = EmbeddedManager()
manager.start()
ddict = manager.mydict()
ddict.set_manager_adress(manager.address)
with concurrent.futures.ProcessPoolExecutor(8) as executor:
results = []
for i in range(10):
ind = np.random.randint(2)
res = executor.submit(called_function2, i, ind, ddict)
results.append(res)
for item in results:
item.result()
for k, v in ddict.items():
print(k, v)
if __name__ == "__main__":
func2()