Home > OS >  How to get `multiprocessing.queues.Queue.qsize()` on macOS?
How to get `multiprocessing.queues.Queue.qsize()` on macOS?

Time:12-10

This is an old issue which suggested workaround does not work.

Below is a complete example showing how the suggested approach fails. Uncomment line 31 (# self.size.increment(-1) # uncomment this for error) for error.

import multiprocessing
import os
import time
from multiprocessing import get_context
from multiprocessing.queues import Queue

class SharedCounter(object):
    def __init__(self, n=0):
        self.count = multiprocessing.Value('i', n)

    def increment(self, n=1):
        with self.count.get_lock():
            self.count.value  = n

    @property
    def value(self):
        return self.count.value

class MyQueue(Queue):
    def __init__(self, *args, **kwargs):
        super(MyQueue, self).__init__(*args, ctx=get_context(), **kwargs)
        self.size = SharedCounter(0)

    def put(self, *args, **kwargs):
        self.size.increment(1)
        super(MyQueue, self).put(*args, **kwargs)

    def get(self, *args, **kwargs):
        # self.size.increment(-1)  # uncomment this for error
        return super(MyQueue, self).get(*args, **kwargs)

    def qsize(self):
        return self.size.value

    def empty(self):
        return not self.qsize()

    def clear(self):
        while not self.empty():
            self.get()

def worker(queue):
    while True:
        item = queue.get()
        if item is None:
            break
        print(f'[{os.getpid()}]: got {item}')
        time.sleep(1)

if __name__ == '__main__':
    num_processes = 4
    q = MyQueue()
    pool = multiprocessing.Pool(num_processes, worker, (q,))

    for i in range(10):
        q.put("hello")
        q.put("world")

    for i in range(num_processes):
        q.put(None)

    q.close()
    q.join_thread()
    pool.close()
    pool.join()

For some reason, the newly defined MyQueue forgets about the size attribute.

Process SpawnPoolWorker-1:
Traceback (most recent call last):
  File "/usr/local/Cellar/[email protected]/3.11.0/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/usr/local/Cellar/[email protected]/3.11.0/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/Cellar/[email protected]/3.11.0/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/pool.py", line 109, in worker
    initializer(*initargs)
  File "/Users/user/Library/Application Support/JetBrains/PyCharm2022.3/scratches/scratch.py", line 47, in worker
    item = queue.get()
           ^^^^^^^^^^^
  File "/Users/user/Library/Application Support/JetBrains/PyCharm2022.3/scratches/scratch.py", line 31, in get
    self.size.increment(-1)  # uncomment this for error
    ^^^^^^^^^
AttributeError: 'MyQueue' object has no attribute 'size'
Process SpawnPoolWorker-2:
Traceback (most recent call last):
  File "/usr/local/Cellar/[email protected]/3.11.0/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/usr/local/Cellar/[email protected]/3.11.0/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/Cellar/[email protected]/3.11.0/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/pool.py", line 109, in worker
    initializer(*initargs)
  File "/Users/user/Library/Application Support/JetBrains/PyCharm2022.3/scratches/scratch.py", line 47, in worker
    item = queue.get()
           ^^^^^^^^^^^
  File "/Users/user/Library/Application Support/JetBrains/PyCharm2022.3/scratches/scratch.py", line 31, in get
    self.size.increment(-1)  # uncomment this for error
    ^^^^^^^^^
AttributeError: 'MyQueue' object has no attribute 'size'
Process SpawnPoolWorker-4:
Process SpawnPoolWorker-3:
Traceback (most recent call last):
Traceback (most recent call last):
  File "/usr/local/Cellar/[email protected]/3.11.0/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/usr/local/Cellar/[email protected]/3.11.0/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/Cellar/[email protected]/3.11.0/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/pool.py", line 109, in worker
    initializer(*initargs)
  File "/Users/user/Library/Application Support/JetBrains/PyCharm2022.3/scratches/scratch.py", line 47, in worker
    item = queue.get()
           ^^^^^^^^^^^
  File "/Users/user/Library/Application Support/JetBrains/PyCharm2022.3/scratches/scratch.py", line 31, in get
    self.size.increment(-1)  # uncomment this for error
    ^^^^^^^^^
AttributeError: 'MyQueue' object has no attribute 'size'
  File "/usr/local/Cellar/[email protected]/3.11.0/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/usr/local/Cellar/[email protected]/3.11.0/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/Cellar/[email protected]/3.11.0/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/pool.py", line 109, in worker
    initializer(*initargs)
  File "/Users/user/Library/Application Support/JetBrains/PyCharm2022.3/scratches/scratch.py", line 47, in worker
    item = queue.get()
           ^^^^^^^^^^^
  File "/Users/user/Library/Application Support/JetBrains/PyCharm2022.3/scratches/scratch.py", line 31, in get
    self.size.increment(-1)  # uncomment this for error
    ^^^^^^^^^
AttributeError: 'MyQueue' object has no attribute 'size'

CodePudding user response:

well, you didn't to override __setstate__ and __getstate__ to include your variable, which are used by pickle to control the serialization Handling Stateful Objects ... so you should override them to add your variable to what's being serialized.

import multiprocessing
import os
import time
from multiprocessing import get_context
from multiprocessing.queues import Queue

class SharedCounter(object):
    def __init__(self, n=0):
        self.count = multiprocessing.Value('i', n)

    def increment(self, n=1):
        with self.count.get_lock():
            self.count.value  = n

    @property
    def value(self):
        return self.count.value

class MyQueue(Queue):
    def __init__(self, *args, **kwargs):
        super(MyQueue, self).__init__(*args, ctx=get_context(), **kwargs)
        self.size = SharedCounter(0)

    def __getstate__(self):
        return (super(MyQueue, self).__getstate__(),self.size)
    def __setstate__(self, state):
        super(MyQueue, self).__setstate__(state[0])
        self.size = state[1]
    
    def put(self, *args, **kwargs):
        self.size.increment(1)
        super(MyQueue, self).put(*args, **kwargs)

    def get(self, *args, **kwargs):
        self.size.increment(-1)  # uncomment this for error
        return super(MyQueue, self).get(*args, **kwargs)

    def qsize(self):
        return self.size.value

    def empty(self):
        return not self.qsize()

    def clear(self):
        while not self.empty():
            self.get()

def worker(queue):
    while True:
        item = queue.get()
        if item is None:
            break
        print(f'[{os.getpid()}]: got {item}')
        time.sleep(1)

if __name__ == '__main__':
    num_processes = 4
    q = MyQueue()
    pool = multiprocessing.Pool(num_processes, initializer=worker, initargs=(q,))

    for i in range(10):
        q.put("hello")
        q.put("world")

    for i in range(num_processes):
        q.put(None)

    q.close()
    q.join_thread()
    pool.close()
    pool.join()

note that in python 3 we don't need to use super(MyQueue, self), as super() would suffice, and will make it easier to rename your class in the future and other portability and refactoring benefits, so consider swapping any super(x,y) with just super()

  • Related