Home > Software engineering >  How do I nest multiprocessing in multiprocessing, with common variables (python)?
How do I nest multiprocessing in multiprocessing, with common variables (python)?

Time:10-21

I have a function which is running twice in two parallel processes. Lets call it - parentFunction(). Each process ends with a dictionary which is added to a common list which gives a list of two dictionaries. This I solved by using preset list using manager.

Now, inside parentFunction() L would like to run two parallel processes, each gives one variable to the dictionary. I tried to do this with preset dictionary using manager

At the end I`m converting the list of dictionaries to pandas data frame.

def I(D, a):
        D["a"] = a
def II(D, b):
        D["a"] = b

def task(L, x):
            x = 0
            a = 1
            b = 2
            manager = Manager()
            D = manager.dict()  # <-- can be shared between processes.
            pI = Process(target=I, args=(D, 0))
            pII = Process(target=II, args=(D, 0))
            pI.start()
            pII.start()
            pI.join()
            pII.join()

            L.append(D)

if __name__ == "__main__":
    with Manager() as manager:
        L = manager.list()  # <-- can be shared between processes.
        p1 = Process(target=task, args=(L, 0))  # Passing the list
        p2 = Process(target=task, args=(L, 0))  # Passing the list
        p1.start()
        p2.start()
        p1.join()
        p2.join()
        
        print(L)

returns error:

TypeError: task() missing 1 required positional argument: 'L'

Traceback (most recent call last):
  File "C:\Users\user\AppData\Roaming\JetBrains\PyCharmCE2021.2\scratches\scratch_8.py", line 88, in <module>
    print(list(L))
  File "<string>", line 2, in __getitem__
  File "C:\Users\user\AppData\Local\Programs\Python\Python39\lib\multiprocessing\managers.py", line 810, in _callmethod
    kind, result = conn.recv()
  File "C:\Users\user\AppData\Local\Programs\Python\Python39\lib\multiprocessing\connection.py", line 256, in recv
    return _ForkingPickler.loads(buf.getbuffer())
  File "C:\Users\user\AppData\Local\Programs\Python\Python39\lib\multiprocessing\managers.py", line 934, in RebuildProxy
    return func(token, serializer, incref=incref, **kwds)
  File "C:\Users\user\AppData\Local\Programs\Python\Python39\lib\multiprocessing\managers.py", line 784, in __init__
    self._incref()
  File "C:\Users\user\AppData\Local\Programs\Python\Python39\lib\multiprocessing\managers.py", line 838, in _incref
    conn = self._Client(self._token.address, authkey=self._authkey)
  File "C:\Users\user\AppData\Local\Programs\Python\Python39\lib\multiprocessing\connection.py", line 505, in Client
    c = PipeClient(address)
  File "C:\Users\user\AppData\Local\Programs\Python\Python39\lib\multiprocessing\connection.py", line 707, in PipeClient
    _winapi.WaitNamedPipe(address, 1000)
FileNotFoundError: [WinError 2] The system cannot find the file specified
```

CodePudding user response:

The source you posted does not seem to match your stack trace. You would only get a FileNotFoundException when the main process tries to enumerate any objects within list L with a statement such as print(list(L)), which I see in the stack trace but not in your code. It helps when you post the actual code causing the exception. But here is the cause of your problem:

When you create a new manager with the call manager = Manager() a new process is created and any objects that are created via the manager "live" in the same address space and process as that manager. You are creating two manager processes, once in the main process and once in the child process task. It is in the latter that the dictionary, D is created. When that process terminates the manager process terminates too along with any objects created by that manager. So when the main process attempts to print the list L, the proxy object within it, D, no longer points to an existing object. The solution is to have the main process create the dictionary, D, and pass it to the task child process:

from multiprocessing import Process, Manager

def I(D, a):
    D["a"] = a

def II(D, b):
    D["a"] = b

def task(L, D, x):
    x = 0
    a = 1
    b = 2
    pI = Process(target=I, args=(D, 0))
    pII = Process(target=II, args=(D, 0))
    pI.start()
    pII.start()
    pI.join()
    pII.join()

    L.append(D)

if __name__ == "__main__":
    with Manager() as manager:
        L = manager.list()  # <-- can be shared between processes.
        D = manager.dict()  # <-- can be shared between processes.
        p = Process(target=task, args=(L, D, 0))  # Passing the list
        p.start()
        p.join()
        print(L[0])

Prints:

{'a': 0}
  • Related