Home > Enterprise >  How to share dictionary memory in different process?
How to share dictionary memory in different process?

Time:01-11

Is it possible to share a dictionary variable in Python Multiprocess (pathos.multiprocess)?

I use the following code, however it doesn't work as expected.

I hope the skus be {0: 0, 1: 1, ...}

from pathos.multiprocessing import Pool as ProcessPool

def outer():
    skus = {}

    def process(skus, sku):
        skus[sku] = sku * 10

    with ProcessPool() as pool:
        pool.starmap(process, ((skus, sku) for sku in range(100)), chunksize=3)

    print(skus)

if __name__ == "__main__":
    outer()

Output:

skus = {}

So, I used Manager().dict as my variable, but now I get the another error.

Where is the problem and how can I correctly share dict in multiprocess?

from pathos.multiprocessing import Pool as ProcessPool
from multiprocessing import Manager

def outer():
    manager = Manager()
    skus = manager.dict()

    def process(sku):
        skus[sku] = sku * 10

    with ProcessPool() as pool:
        pool.map(process, range(100), chunksize=3)

    print(skus)

if __name__ == "__main__":
    outer()

Output: (Error)

....
raise AuthenticationError('digest sent was rejected')
multiprocessing.context.AuthenticationError: digest sent was rejected

CodePudding user response:

the correct way: make your functions in the global scope and use multiprocessing instead of pathos, as it is made to use manager objects seamlessly.

from multiprocessing import Pool as ProcessPool
from multiprocessing import Manager


def process(sku):
    skus[sku] = sku * 10

def initializer_func(skus_dict):
    global skus
    skus = skus_dict

def outer():
    manager = Manager()
    skus = manager.dict()

    with ProcessPool(initializer=initializer_func, initargs=(skus,)) as pool:
        pool.map(process, range(100), chunksize=3)

    print(skus)

if __name__ == "__main__":
    outer()

the hacky way: set the authentication key of the child processes to the same authentication key of your manager in the initializer, then unpickle the managed dictionary.

from pathos.multiprocessing import Pool as ProcessPool
from multiprocessing.managers import SyncManager
import multiprocessing
import pickle
import io

def outer():
    manager = SyncManager(authkey=b"123")
    manager.start()
    skus = manager.dict()
    buffer = io.BytesIO()
    pickle.dump(skus,buffer)

    def process(sku):
        skus[sku] = sku * 10

    def initializer_func(buffer):
        global skus
        multiprocessing.current_process().authkey = b'123'
        skus = pickle.loads(buffer.getvalue())

    with ProcessPool(initializer=initializer_func, initargs=(buffer,)) as pool:
        pool.map(process, range(100), chunksize=3)

    print(skus)

if __name__ == "__main__":
    outer()

keep in mind that using the hacky way just gets past "that error", it still has more problems later, and you will have to come up with more solutions for them, and after a month of doing nothing but investigating how pathos and multiprocessing work you'll give up and use multiprocessing without pathos, instead of coming up with 1000 workarounds for the 1000 problems that you created for yourself.

pathos is a nice library, but it just doesn't work well with the rest of python libraries and you will have to come up with workarounds.

CodePudding user response:

Using a managed dictionary is certainly a method that enables the sharing of a dictionary across multiple processes. But there is considerable overhead in using such a dictionary arising from the fact that every method call on the dictionary a process makes is actually invoking a method of a proxy object. The actual dictionary itself resides in a special process that is created when multiprocessing.Manager() is invoked. Thus, the proxy object must marshal whatever arguments are passed to it along with the method name being invoked by serializing these values and sending them via a socket or named pipe to the manager's process where the request will de-serialized and applied against the actual dictionary.

So in these situations you should ask the question whether it is really necessary for there to be a sharable dictionary at all. Since your worker function process is only updating a single key value of the dictionary and does not need to know what other keys and values in the dictionary are, it could simply do its calculation and return back to the main process the key and value that needs to be set and the main process can do the updating/creation of the dictionary:

from multiprocessing import Pool as ProcessPool

def process(sku):
    return sku, sku * 10

def outer():
    with ProcessPool() as pool:
        skus = {sku: result for sku, result in pool.map(process, range(100), chunksize=3)}

    print(skus)

if __name__ == "__main__":
    outer()

Prints:

{0: 0, 1: 10, 2: 20, 3: 30, 4: 40, 5: 50, 6: 60, 7: 70, 8: 80, 9: 90, 10: 100, 11: 110, 12: 120, 13: 130, 14: 140, 15: 150, 16: 160, 17: 170, 18: 180, 19: 190, 20: 200, 21: 210, 22: 220, 23: 230, 24: 240, 25: 250, 26: 260, 27: 270, 28: 280, 29: 290, 30: 300, 31: 310, 32: 320, 33: 330, 34: 340, 35: 350, 36: 360, 37: 370, 38: 380, 39: 390, 40: 400, 41: 410, 42: 420, 43: 430, 44: 440, 45: 450, 46: 460, 47: 470, 48: 480, 49: 490, 50: 500, 51: 510, 52: 520, 53: 530, 54: 540, 55: 550, 56: 560, 57: 570, 58: 580, 59: 590, 60: 600, 61: 610, 62: 620, 63: 630, 64: 640, 65: 650, 66: 660, 67: 670, 68: 680, 69: 690, 70: 700, 71: 710, 72: 720, 73: 730, 74: 740, 75: 750, 76: 760, 77: 770, 78: 780, 79: 790, 80: 800, 81: 810, 82: 820, 83: 830, 84: 840, 85: 850, 86: 860, 87: 870, 88: 880, 89: 890, 90: 900, 91: 910, 92: 920, 93: 930, 94: 940, 95: 950, 96: 960, 97: 970, 98: 980, 99: 990}

Benchamark

We will take the above approach but instead process 1_000_000 key/value pairs and allow the pool to pick its own optimum chunksize:

from multiprocessing import Pool as ProcessPool
from timing import time_it

def process(sku):
    return sku, sku * 10

@time_it
def outer():
    with ProcessPool() as pool:
        skus = {sku: result for sku, result in pool.map(process, range(1_000_000))}

    #print(skus)

if __name__ == "__main__":
    outer()

Prints:

func: outer args: [(), {}] took: 0.7160062 sec.

Here is a version where the process worker function does not need to send back the key:

from multiprocessing import Pool as ProcessPool, cpu_count
from timing import time_it

def process(sku):
    return sku * 10


@time_it
def outer():
    with ProcessPool() as pool:
        skus = {i: result for i, result in enumerate(pool.map(process, range(1_000_000)))}

    #print(skus)

if __name__ == "__main__":
    outer()

Prints:

func: outer args: [(), {}] took: 0.5001733 sec.

And here we use a managed dictionary as a global variable:

from multiprocessing import Pool as ProcessPool, Manager
from timing import time_it

def init_pool_processes(d):
    global skus
    skus = d

def process(sku):
    skus[sku] = sku * 10

@time_it
def outer():
    skus = Manager().dict()
    with ProcessPool(initializer=init_pool_processes, initargs=(skus,)) as pool:
        pool.map(process, range(1_000_000))

    #print(skus)

if __name__ == "__main__":
    outer()

Prints:

func: outer args: [(), {}] took: 60.8297226 sec.

And instead of passing skus as a global variable, we will explicitly pass it as an additional argument to process:

from multiprocessing import Pool as ProcessPool, Manager
from functools import partial
from timing import time_it

def process(skus, sku):
    skus[sku] = sku * 10

@time_it
def outer():
    skus = Manager().dict()
    with ProcessPool() as pool:
        pool.map(partial(process, skus), range(1_000_000))

    #print(skus)

if __name__ == "__main__":
    outer()

Prints:

func: outer args: [(), {}] took: 60.7377072 sec.

Results:

Generally accessing a global variable is slower than accessing a local variable. But since process is only accessing skus once and that access represents such a small percentage of the total CPU processing required when using a managed dictionary, it really doesn't matter too much which method of passing the managed dictionary is used: the running time will be approximately 61 seconds vs. .5 seconds when not using a managed dictionary at all.

  • Related