I'm after constantly reading images from an OpenCV camera in Python and reading from the main program the latest image. This is needed because of problematic HW.
After messing around with threads and getting a very low efficiency (duh!), I'd like to switch to multiprocessing.
Here's the threading version:
class WebcamStream:
# initialization method
def __init__(self, stream_id=0):
self.stream_id = stream_id # default is 0 for main camera
# opening video capture stream
self.camera = cv2.VideoCapture(self.stream_id)
self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, 3840)
self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, 2880)
if self.camera.isOpened() is False:
print("[Exiting]: Error accessing webcam stream.")
exit(0)
# reading a single frame from camera stream for initializing
_, self.frame = self.camera.read()
# self.stopped is initialized to False
self.stopped = True
# thread instantiation
self.t = Thread(target=self.update, args=())
self.t.daemon = True # daemon threads run in background
# method to start thread
def start(self):
self.stopped = False
self.t.start()
# method passed to thread to read next available frame
def update(self):
while True:
if self.stopped is True:
break
_, self.frame = self.camera.read()
self.camera.release()
# method to return latest read frame
def read(self):
return self.frame
# method to stop reading frames
def stop(self):
self.stopped = True
And -
if __name__ == "__main__":
main_camera_stream = WebcamStream(stream_id=0)
main_camera_stream.start()
frame = main_camera_stream.read()
Can someone please help me translate this to multiprocess land ?
Thanks!
CodePudding user response:
I've written several solutions to similar problems, but it's been a little while so here we go:
I would use shared_memory
as a buffer to read frames into, which can then be read by another process. My first inclination is to initialize the camera and read frames in the child process, because that seems like it would be a "set it and forget it" kind of thing.
from multiprocessing import Process, Lock, Queue
from multiprocessing.shared_memory import SharedMemory
import cv2
import numpy as np
from time import sleep
class Shared_Arr: #helper class to make shared_memory arrays easier <https://stackoverflow.com/a/71769491/3220135>
def __init__(self, shape, dtype, shm=None):
self.shape=shape
self.dtype=dtype
if shm is None:
n_bytes = int(np.dtype(dtype).itemsize * np.prod(shape))
self.shm = SharedMemory(create=True, size=n_bytes)
self.owner = True
else:
self.shm = shm
self.owner = False
self.close = self.shm.close
self.unlink = self.shm.unlink
self.arr = np.ndarray(self.shape, self.dtype, buffer=self.shm.buf)
def __reduce__(self): #make it picklable so it can be sent to a child process correctly
return (self.__class__, (self.shape, self.dtype, self.shm))
def __enter__(self): #context manager is mostly for cleanup so __enter__ is uninteresting
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close() #closes the memory-mapped file
if self.owner:
self.unlink() #tell the OS to delete the file
class Frame_Producer(Process):
def __init__(self):
super().__init__()
self.q = Queue()
self.l = Lock()
self.daemon = True
def run(self):
vid_device = r"D:\Videos\movies\GhostintheShell.mp4" #a great movie
#get the first frame to calculate size
cap = cv2.VideoCapture(vid_device)
success, frame = cap.read()
if not success:
raise Exception("error reading from video")
with Shared_Arr(frame.shape, frame.dtype) as framebuffer:
self.q.put(framebuffer) #send back to main #if we knew the frame size in advance we could simplify this process by creating it in main
try:
while True:
sleep(1/30) #video framerate
with self.l: #lock the framebuffer while we write to it
cap.read(framebuffer.arr)
except KeyboardInterrupt: #this gets propogated from the parent on some systems, making "daemon" redundant
pass
def start(self):
super().start()
self.framebuffer = self.q.get()
def __enter__(self):
self.start()
return self
def __exit__(self, exc_type, exc_value, traceback):
self.framebuffer.__exit__(exc_type, exc_value, traceback) #cleanup
self.terminate()
def get(self):
with self.l:
return np.copy(self.framebuffer.arr) #return a copy which won't be overwritten unexpectedly
if __name__ == "__main__":
with Frame_Producer() as fp:
try:
while True:
cv2.imshow("frame", fp.get())
cv2.waitKey(1000) #main process consumes frames more slowly than producer process provides them
except KeyboardInterrupt:
pass