Is it possible to share a dictionary variable in Python Multiprocess (pathos.multiprocess
)?
I use the following code, however it doesn't work as expected.
I hope the skus
be {0: 0, 1: 1, ...}
from pathos.multiprocessing import Pool as ProcessPool
def outer():
skus = {}
def process(skus, sku):
skus[sku] = sku * 10
with ProcessPool() as pool:
pool.starmap(process, ((skus, sku) for sku in range(100)), chunksize=3)
print(skus)
if __name__ == "__main__":
outer()
Output:
skus = {}
So, I used Manager().dict
as my variable, but now I get the another error.
Where is the problem and how can I correctly share dict
in multiprocess?
from pathos.multiprocessing import Pool as ProcessPool
from multiprocessing import Manager
def outer():
manager = Manager()
skus = manager.dict()
def process(sku):
skus[sku] = sku * 10
with ProcessPool() as pool:
pool.map(process, range(100), chunksize=3)
print(skus)
if __name__ == "__main__":
outer()
Output: (Error)
....
raise AuthenticationError('digest sent was rejected')
multiprocessing.context.AuthenticationError: digest sent was rejected
CodePudding user response:
the correct way:
make your functions in the global scope and use multiprocessing
instead of pathos
, as it is made to use manager
objects seamlessly.
from multiprocessing import Pool as ProcessPool
from multiprocessing import Manager
def process(sku):
skus[sku] = sku * 10
def initializer_func(skus_dict):
global skus
skus = skus_dict
def outer():
manager = Manager()
skus = manager.dict()
with ProcessPool(initializer=initializer_func, initargs=(skus,)) as pool:
pool.map(process, range(100), chunksize=3)
print(skus)
if __name__ == "__main__":
outer()
the hacky way: set the authentication key of the child processes to the same authentication key of your manager in the initializer, then unpickle the managed dictionary.
from pathos.multiprocessing import Pool as ProcessPool
from multiprocessing.managers import SyncManager
import multiprocessing
import pickle
import io
def outer():
manager = SyncManager(authkey=b"123")
manager.start()
skus = manager.dict()
buffer = io.BytesIO()
pickle.dump(skus,buffer)
def process(sku):
skus[sku] = sku * 10
def initializer_func(buffer):
global skus
multiprocessing.current_process().authkey = b'123'
skus = pickle.loads(buffer.getvalue())
with ProcessPool(initializer=initializer_func, initargs=(buffer,)) as pool:
pool.map(process, range(100), chunksize=3)
print(skus)
if __name__ == "__main__":
outer()
keep in mind that using the hacky way just gets past "that error", it still has more problems later, and you will have to come up with more solutions for them, and after a month of doing nothing but investigating how pathos and multiprocessing work you'll give up and use multiprocessing without pathos, instead of coming up with 1000 workarounds for the 1000 problems that you created for yourself.
pathos is a nice library, but it just doesn't work well with the rest of python libraries and you will have to come up with workarounds.
CodePudding user response:
Using a managed dictionary is certainly a method that enables the sharing of a dictionary across multiple processes. But there is considerable overhead in using such a dictionary arising from the fact that every method call on the dictionary a process makes is actually invoking a method of a proxy object. The actual dictionary itself resides in a special process that is created when multiprocessing.Manager()
is invoked. Thus, the proxy object must marshal whatever arguments are passed to it along with the method name being invoked by serializing these values and sending them via a socket or named pipe to the manager's process where the request will de-serialized and applied against the actual dictionary.
So in these situations you should ask the question whether it is really necessary for there to be a sharable dictionary at all. Since your worker function process
is only updating a single key value of the dictionary and does not need to know what other keys and values in the dictionary are, it could simply do its calculation and return back to the main process the key and value that needs to be set and the main process can do the updating/creation of the dictionary:
from multiprocessing import Pool as ProcessPool
def process(sku):
return sku, sku * 10
def outer():
with ProcessPool() as pool:
skus = {sku: result for sku, result in pool.map(process, range(100), chunksize=3)}
print(skus)
if __name__ == "__main__":
outer()
Prints:
{0: 0, 1: 10, 2: 20, 3: 30, 4: 40, 5: 50, 6: 60, 7: 70, 8: 80, 9: 90, 10: 100, 11: 110, 12: 120, 13: 130, 14: 140, 15: 150, 16: 160, 17: 170, 18: 180, 19: 190, 20: 200, 21: 210, 22: 220, 23: 230, 24: 240, 25: 250, 26: 260, 27: 270, 28: 280, 29: 290, 30: 300, 31: 310, 32: 320, 33: 330, 34: 340, 35: 350, 36: 360, 37: 370, 38: 380, 39: 390, 40: 400, 41: 410, 42: 420, 43: 430, 44: 440, 45: 450, 46: 460, 47: 470, 48: 480, 49: 490, 50: 500, 51: 510, 52: 520, 53: 530, 54: 540, 55: 550, 56: 560, 57: 570, 58: 580, 59: 590, 60: 600, 61: 610, 62: 620, 63: 630, 64: 640, 65: 650, 66: 660, 67: 670, 68: 680, 69: 690, 70: 700, 71: 710, 72: 720, 73: 730, 74: 740, 75: 750, 76: 760, 77: 770, 78: 780, 79: 790, 80: 800, 81: 810, 82: 820, 83: 830, 84: 840, 85: 850, 86: 860, 87: 870, 88: 880, 89: 890, 90: 900, 91: 910, 92: 920, 93: 930, 94: 940, 95: 950, 96: 960, 97: 970, 98: 980, 99: 990}
Benchamark
We will take the above approach but instead process 1_000_000 key/value pairs and allow the pool to pick its own optimum chunksize:
from multiprocessing import Pool as ProcessPool
from timing import time_it
def process(sku):
return sku, sku * 10
@time_it
def outer():
with ProcessPool() as pool:
skus = {sku: result for sku, result in pool.map(process, range(1_000_000))}
#print(skus)
if __name__ == "__main__":
outer()
Prints:
func: outer args: [(), {}] took: 0.7160062 sec.
Here is a version where the process
worker function does not need to send back the key:
from multiprocessing import Pool as ProcessPool, cpu_count
from timing import time_it
def process(sku):
return sku * 10
@time_it
def outer():
with ProcessPool() as pool:
skus = {i: result for i, result in enumerate(pool.map(process, range(1_000_000)))}
#print(skus)
if __name__ == "__main__":
outer()
Prints:
func: outer args: [(), {}] took: 0.5001733 sec.
And here we use a managed dictionary as a global variable:
from multiprocessing import Pool as ProcessPool, Manager
from timing import time_it
def init_pool_processes(d):
global skus
skus = d
def process(sku):
skus[sku] = sku * 10
@time_it
def outer():
skus = Manager().dict()
with ProcessPool(initializer=init_pool_processes, initargs=(skus,)) as pool:
pool.map(process, range(1_000_000))
#print(skus)
if __name__ == "__main__":
outer()
Prints:
func: outer args: [(), {}] took: 60.8297226 sec.
And instead of passing skus
as a global variable, we will explicitly pass it as an additional argument to process
:
from multiprocessing import Pool as ProcessPool, Manager
from functools import partial
from timing import time_it
def process(skus, sku):
skus[sku] = sku * 10
@time_it
def outer():
skus = Manager().dict()
with ProcessPool() as pool:
pool.map(partial(process, skus), range(1_000_000))
#print(skus)
if __name__ == "__main__":
outer()
Prints:
func: outer args: [(), {}] took: 60.7377072 sec.
Results:
Generally accessing a global variable is slower than accessing a local variable. But since process
is only accessing skus
once and that access represents such a small percentage of the total CPU processing required when using a managed dictionary, it really doesn't matter too much which method of passing the managed dictionary is used: the running time will be approximately 61 seconds vs. .5 seconds when not using a managed dictionary at all.