Home > Blockchain >  Constant camera grabbing with OpenCV & Python multiprocessing
Constant camera grabbing with OpenCV & Python multiprocessing

Time:06-02

I'm after constantly reading images from an OpenCV camera in Python and reading from the main program the latest image. This is needed because of problematic HW.

After messing around with threads and getting a very low efficiency (duh!), I'd like to switch to multiprocessing.

Here's the threading version:

class WebcamStream:
    # initialization method
    def __init__(self, stream_id=0):
        self.stream_id = stream_id  # default is 0 for main camera

        # opening video capture stream
        self.camera = cv2.VideoCapture(self.stream_id)
        self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, 3840)
        self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, 2880)

        if self.camera.isOpened() is False:
            print("[Exiting]: Error accessing webcam stream.")
            exit(0)

        # reading a single frame from camera stream for initializing
        _, self.frame = self.camera.read()

        # self.stopped is initialized to False
        self.stopped = True

        # thread instantiation
        self.t = Thread(target=self.update, args=())
        self.t.daemon = True  # daemon threads run in background

    # method to start thread
    def start(self):
        self.stopped = False
        self.t.start()

    # method passed to thread to read next available frame
    def update(self):
        while True:
            if self.stopped is True:
                break
            _, self.frame = self.camera.read()
        self.camera.release()

    # method to return latest read frame
    def read(self):
        return self.frame

    # method to stop reading frames
    def stop(self):
        self.stopped = True

And -

if __name__ == "__main__":
    main_camera_stream = WebcamStream(stream_id=0)
    main_camera_stream.start()
    frame = main_camera_stream.read()

Can someone please help me translate this to multiprocess land ?

Thanks!

CodePudding user response:

I've written several solutions to similar problems, but it's been a little while so here we go:

I would use shared_memory as a buffer to read frames into, which can then be read by another process. My first inclination is to initialize the camera and read frames in the child process, because that seems like it would be a "set it and forget it" kind of thing.

from multiprocessing import Process, Lock, Queue
from multiprocessing.shared_memory import SharedMemory
import cv2
import numpy as np
from time import sleep

class Shared_Arr: #helper class to make shared_memory arrays easier <https://stackoverflow.com/a/71769491/3220135>
    def __init__(self, shape, dtype, shm=None):
        self.shape=shape
        self.dtype=dtype

        if shm is None:
            n_bytes = int(np.dtype(dtype).itemsize * np.prod(shape))
            self.shm = SharedMemory(create=True, size=n_bytes)
            self.owner = True
        else:
            self.shm = shm
            self.owner = False

        self.close = self.shm.close
        self.unlink = self.shm.unlink

        self.arr = np.ndarray(self.shape, self.dtype, buffer=self.shm.buf)

    def __reduce__(self): #make it picklable so it can be sent to a child process correctly
        return (self.__class__, (self.shape, self.dtype, self.shm))

    def __enter__(self): #context manager is mostly for cleanup so __enter__ is uninteresting
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        self.close() #closes the memory-mapped file
        if self.owner:
            self.unlink() #tell the OS to delete the file

class Frame_Producer(Process):
    def __init__(self):
        super().__init__()
        self.q = Queue()
        self.l = Lock()
        self.daemon = True
        
    def run(self):
        vid_device = r"D:\Videos\movies\GhostintheShell.mp4" #a great movie
        
        #get the first frame to calculate size
        cap = cv2.VideoCapture(vid_device)
        success, frame = cap.read()
        if not success:
            raise Exception("error reading from video")
            
        with Shared_Arr(frame.shape, frame.dtype) as framebuffer:
            self.q.put(framebuffer) #send back to main #if we knew the frame size in advance we could simplify this process by creating it in main
            
            try:
                while True:
                    sleep(1/30) #video framerate
                    with self.l: #lock the framebuffer while we write to it
                        cap.read(framebuffer.arr)
            except KeyboardInterrupt: #this gets propogated from the parent on some systems, making "daemon" redundant
                pass
                
    def start(self):
        super().start()
        self.framebuffer = self.q.get()
    
    def __enter__(self):
        self.start()
        return self
    
    def __exit__(self, exc_type, exc_value, traceback):
        self.framebuffer.__exit__(exc_type, exc_value, traceback) #cleanup
        self.terminate()
    
    def get(self):
        with self.l:
            return np.copy(self.framebuffer.arr) #return a copy which won't be overwritten unexpectedly
    
    
if __name__ == "__main__":
    with Frame_Producer() as fp:
        try:
            while True:
                cv2.imshow("frame", fp.get())
                cv2.waitKey(1000) #main process consumes frames more slowly than producer process provides them
        except KeyboardInterrupt:
            pass
  • Related