Home > Enterprise >  class variables and multiprocessing in python - what changed?
class variables and multiprocessing in python - what changed?

Time:07-29

Let's start from the punch line:

✗ ~/.pyenv/versions/3.7.11/bin/python demo_multiprocessing.py
<Process(Process-6, started)>  is putting  4  in the list
<Process(Process-2, started)>  is putting  0  in the list
<Process(Process-4, started)>  is putting  2  in the list
<Process(Process-5, started)>  is putting  3  in the list
<Process(Process-3, started)>  is putting  1  in the list
<Process(Process-7, started)>  is putting  5  in the list
all processes finished, here's what we have in the list:
[4, 0, 2, 3, 1, 5]
✗ ~/.pyenv/versions/3.9.11/bin/python demo_multiprocessing.py
<Process name='Process-4' parent=74535 started>  is putting  2  in the list
<Process name='Process-5' parent=74535 started>  is putting  3  in the list
<Process name='Process-6' parent=74535 started>  is putting  4  in the list
<Process name='Process-2' parent=74535 started>  is putting  0  in the list
<Process name='Process-7' parent=74535 started>  is putting  5  in the list
<Process name='Process-3' parent=74535 started>  is putting  1  in the list
all processes finished, here's what we have in the list:
[]

here's the code:

from multiprocessing import Process, current_process, Manager
from typing import List
from random import shuffle

class StreamInterface:
    def write(self, x: int) -> None:
        raise NotImplemented

class ListStream(StreamInterface):
    def __init__(self, manager: Manager):
        self._manager = manager
        self.shared_list = self._manager.list()

    def write(self, x : int):
        self.shared_list.append(x)

    def pop_all(self) -> List[int]:
        tmp = list(self.shared_list)
        self.shared_list[:] = []
        return tmp


class NullStream(StreamInterface):
    def write(self, x:int):
        pass


n_processors = 6

def do_work(i):
    print(current_process(), " is putting ", i, " in the list")
    ShiftPlanner.list_stream.write(i)

class ShiftPlanner:
    list_stream: StreamInterface = NullStream()
    pass


if __name__ == "__main__":
    with Manager() as manager:
        x = ListStream(manager)
        ShiftPlanner.list_stream = x
        processes = [
            Process(
                target=do_work,
                args=(i,)
            )
            for i in range(n_processors)
        ]

        shuffle(processes)

        for p in processes:
            p.start()

        for p in processes:
            p.join()

        print("all processes finished, here's what we have in the list:")
        print(x.pop_all())
        ShiftPlanner.list_stream = NullStream()

what's going on? Hint: it seems the way class variables are handled changed. If I put a breakpoint at the "write" method, I see that the subprocesses think that self.list_stream is the default, NullStream(), value. But I'd appreciate more details on what changed and what's the most elegant, idiomatic way to fix this - thanks!

CodePudding user response:

Changing class attributes from your main process in this way does not reflect in the child processes started (when the start method is set to spawn). This is because the code inside the if __name__... clause only runs inside your main process, and the child re-imports your main module to copy it's parent's state. Therefore, as far as the child is concerned, the class ShiftPlanner is still the same as it was originally defined.

A simple fix would be to pass the ListStream object to each child process and let them set the class attributes of their memory space themselves. However, doing so would require you to not store the manager inside ListStream.

Example code which works:

from multiprocessing import Process, current_process, Manager
from typing import List
from random import shuffle

class StreamInterface:
    def write(self, x: int) -> None:
        raise NotImplemented

class ListStream(StreamInterface):
    def __init__(self, manager: Manager):
        self.shared_list = manager.list()

    def write(self, x : int):
        self.shared_list.append(x)

    def pop_all(self) -> List[int]:
        tmp = list(self.shared_list)
        self.shared_list[:] = []
        return tmp


class NullStream(StreamInterface):
    def write(self, x:int):
        pass


n_processors = 6

def do_work(i, x):
    ShiftPlanner.list_stream = x
    print(current_process(), " is putting ", i, " in the list")
    ShiftPlanner.list_stream.write(i)

class ShiftPlanner:
    list_stream: StreamInterface = NullStream()
    pass


if __name__ == "__main__":
    with Manager() as manager:
        x = ListStream(manager)
        ShiftPlanner.list_stream = x
        processes = [
            Process(
                target=do_work,
                args=(i, x)
            )
            for i in range(n_processors)
        ]

        shuffle(processes)

        for p in processes:
            p.start()

        for p in processes:
            p.join()

        print("all processes finished, here's what we have in the list:")
        print(x.pop_all())
        ShiftPlanner.list_stream = NullStream()

CodePudding user response:

The issue is not really related to class variables, it's that the default behavior changed from fork to spawn on MacOS:

from the docs: "Changed in version 3.8: On macOS, the spawn start method is now the default. The fork start method should be considered unsafe as it can lead to crashes of the subprocess."

the code can be fixed like so:

from typing import List
from random import shuffle
from multiprocessing import get_context

class StreamInterface:
    def write(self, x: int) -> None:
        raise NotImplemented

class ListStream(StreamInterface):
    def __init__(self, manager: Manager):
        self._manager = manager
        self.shared_list = self._manager.list()

    def write(self, x : int):
        self.shared_list.append(x)

    def pop_all(self) -> List[int]:
        tmp = list(self.shared_list)
        self.shared_list[:] = []
        return tmp


class NullStream(StreamInterface):
    def write(self, x:int):
        pass


n_processors = 6

def do_work(i):
    print(current_process(), " is putting ", i, " in the list")
    ShiftPlanner.list_stream.write(i)

class ShiftPlanner:
    list_stream: StreamInterface = NullStream()
    pass

if __name__ == "__main__":
    ctx = get_context('fork')

    with Manager() as manager:
        x = ListStream(manager)
        ShiftPlanner.list_stream = x
        processes = [
            ctx.Process(
                target=do_work,
                args=(i,)
            )
            for i in range(n_processors)
        ]

        shuffle(processes)

        for p in processes:
            p.start()

        for p in processes:
            p.join()

        print("all processes finished, here's what we have in the list:")
        print(x.pop_all())
        ShiftPlanner.list_stream = NullStream()

  • Related