Home > Back-end >  Can I use a `multiprocessing.Queue` for communication within a process?
Can I use a `multiprocessing.Queue` for communication within a process?

Time:05-25

I'm using queues for inter-thread communication. I'm using multiprocessing.Queue() instead of queue.Queue() because the multiprocessing version exposes an underlying file descriptor which can be waited on with select.select - which means I can block waiting for an object in the queue or a packet to arrive on a network interface from the same thread.

But when I try to get an object from the queue, I get this:

Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/queues.py", line 234, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/usr/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: can't pickle _thread.lock objects

Is there a way to do this? Or am I stuck using queue.Queue() and having a separate thread select.select() on the sockets and put the results into the queue?

Edit: I think this is the minimal reproducible example:

import multiprocessing
import threading

queue = multiprocessing.Queue()

class Msg():
    def __init__(self):
        self.lock = threading.Lock()

def source():
    queue.put(Msg())

def sink():
    obj = queue.get()
    print("Got")

threading.Thread(target=sink).start()
source()

The problem is that the object I'm putting into the queue has a threading.Lock object as a field (at several levels of composition deep).

CodePudding user response:

TL;DR: threading.Lock instances simply cannot be pickled and pickle is used to serialize an object that is put to a multiprocessing.Queue instance. But there is very little value to passing an object to another thread via a multiprocessing.Queue since the thread retrieves what becomes a new instance of that object unless creating a copy of the object is part of your goal. So if you do pass the object via a queue, then the lock cannot not be part of the object's state and you need an alternate approach (see below).

The (much) Longer Answer

First, as your error message states threading.Lock` instances cannot be serialized with pickle. This can also easily be demonstrated:

>>> import pickle
>>> import threading
>>> lock = threading.Lock()
>>> serialized_lock = pickle.dumps(lock)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: cannot pickle '_thread.lock' object

Second, when you put an object to a threading.Queue instance, the object is serialized with pickle and so you get the above exception.

But while your posting constitutes a minimal, complete example, it does not represent a realistic program that does anything useful. What are you actually trying to accomplish? Let's suppose you were able to serialize a lock and therefore pass an instance of Msg via a queue. Presumably the lock is to serialize some code that updates the object's state. But since this is a different instance of Msg than the one that was put on the queue, the only meaningful use of this lock would be if this sink thread created additional threads that operated on this instance. So let's conjecture there is an attribute, x that needs to be incremented in multiple threads. This would require a lock since the = operator is not atomic. Since the required lock could not be part of the object's state if being passed via a queue, then you have to separately create the lock. This is just one of many possible approaches:

import multiprocessing
import threading

queue = multiprocessing.Queue()

class Msg():
    def __init__(self):
        self.x = 0

    def set_lock(self, lock):
        self.lock = lock

    def compute(self):
        with self.lock:
            self.x  = 1

def source():
    queue.put(Msg())

def sink():
    msg = queue.get()
    msg.set_lock(threading.Lock())
    t = threading.Thread(target=msg.compute)
    t.start()
    msg.compute()
    t.join()
    print(msg.x)


threading.Thread(target=sink).start()
source()

Prints:

2

If you are not using a queue for object passing, then there is no problem having the lock as part of the object's initial state:

import queue
import socket
import os
import select
import threading

class PollableQueue(queue.Queue):
    def __init__(self):
        super().__init__()
        # Create a pair of connected sockets
        if os.name == 'posix':
            self._putsocket, self._getsocket = socket.socketpair()
        else:
            # Compatibility on non-POSIX systems
            server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            server.bind(('127.0.0.1', 0))
            server.listen(1)
            self._putsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self._putsocket.connect(server.getsockname())
            self._getsocket, _ = server.accept()
            server.close()

    def fileno(self):
        return self._getsocket.fileno()

    def put(self, item):
        super().put(item)
        self._putsocket.send(b'x')

    def get(self):
        self._getsocket.recv(1)
        return super().get()

class Msg:
    def __init__(self, q, socket):
        # An instance of this class could be passed via a multithreading.Queue
        # A multiprocessing.Lock could also be used but is not
        # necessary if we are doing threading:
        self.lock = threading.Lock() # to be used by some method not shown
        self.q = q
        self.socket = socket

    def consume(self):
        while True:
            can_read, _, _ = select.select([q, read_socket], [], [])
            for r in can_read:
                item = r.get() if isinstance(r, queue.Queue) else r.recv(3).decode()
                print('Got:', item, 'from', type(r))

# Example code that performs polling:

if __name__ == '__main__':
    import threading
    import time

    q = PollableQueue()
    write_socket, read_socket = socket.socketpair()
    msg = Msg(q, read_socket)
    t = threading.Thread(target=msg.consume, daemon=True)
    t.start()

    # Feed data to the queues
    q.put('abc')
    write_socket.send(b'cde')
    write_socket.send(b'fgh')
    q.put('ijk')

    # Give consumer time to get all the items:
    time.sleep(1)

Prints:

Got: abc from <class '__main__.PollableQueue'>
Got: ijk from <class '__main__.PollableQueue'>
Got: cde from <class 'socket.socket'>
Got: fgh from <class 'socket.socket'>
  • Related