Home > Mobile >  Python multiprocessing, read from remote RTSP camera without buffer
Python multiprocessing, read from remote RTSP camera without buffer

Time:10-23

I have a system with two processes, where:

  1. a 'reader' process, getting frames from a remote camera through RTSP;
  2. frames read from 'reader' are sent to 'consumer', to run some computer vision algorithms on them.

Now, the problem is that frames are read from the camera in 'reader' at 25 FPS, but they are clearly analyzed much slower in 'consumer'. Then, I don't want 'consumer' to analyze all of them, but only the latest available one (so computer vision detections refer to the live stream).

Something like described here: enter image description here

I managed to make this work the way I want by a workaround.
Basically, in reader, I check if the queue is empty. If not, it means the frame there has not been analyzed yet, so I delete it and replace it with the current one used:

launcher.py -> start everything

from reader import Reader
from consumer import Consumer
import multiprocessing as mp
from multiprocessing import set_start_method, Queue, Event


def main():

    set_start_method("spawn")
    frames_queue = mp.Queue()
    stop_switch = mp.Event()

    reader = mp.Process(target=Reader, args=(frames_list,), daemon=True)
    consumer = mp.Process(target=Consumer, args=(frames_list, stop_switch), daemon=True)

    reader.start()
    consumer.start()

    while True:
        if stop_switch.is_set():
            reader.terminate()
            consumer.terminate()
            sys.exit(0)


if __name__ == "__main__":
    main()

reader.py -> reading frames from camera

import cv2

def Reader(thing):
    cap = cv2.VideoCapture('rtsp_address')

    while True:
        ret, frame = cap.read()
        if ret:
            if not frames_queue.empty():
                try:
                    frames_queue.get_nowait()   # discard previous (unprocessed) frame
                except queue.Empty:
                    pass

                try:
                    frames_queue.put(cv2.resize(frame, (1080, 720)), block=False)
                except:
                    pass

And something similar in consumer:

consumer.py

import cv2

def Consumer(frames_queue, stop_switch):

    while True:

        try:
            frame = frames_queue.get_nowait()      ## get current camera frame from queue
        except:
            pass

        if frame:
            ## do something computationally intensive on frame
            cv2.imshow('output', cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))

        ## stop system when pressing 'q' key
        key = cv2.waitKey(1)
        if key==ord('q'):
            stop_switch.set()
            break

But I don't really like this, it seems a little too messy. Also, I have to use all the try/except blocks to avoid racing conditions, where 'reader' empties the queue before putting the new frame, and 'consumer' tries to get a frame at the same time. Any other better way to do this?

CodePudding user response:

A few comments:

  1. According to the documentation on the multiprocessing.Queue.empty method, this is not reliable and should not be used.

  2. As I previously commented in a previous post of yours, looping on get_nowait() calls as you are doing in consumer.py when you should be willing to block until a frame is available is just wasting CPU cycles.

  3. Likewise looping in your main process testing stop_switch.is_set() when you could just be issuing stop_switch.wait() is just wasting CPU cycles.

  4. In your main process when you detect that the stop_switch has been set, there is no need to explicitly terminate the daemon processes you created; they will terminate automatically when the main process terminates.

Otherwise, cleaning up the above mentioned items should result in code that I don't think is all that messy.

main.py

from reader import Reader
from consumer import Consumer
import multiprocessing as mp
from multiprocessing import set_start_method, Queue, Event

def main():

    set_start_method("spawn")
    frames_queue = mp.Queue()
    stop_switch = mp.Event()

    reader = mp.Process(target=Reader, args=(frames_list,), daemon=True)
    consumer = mp.Process(target=Consumer, args=(frames_list, stop_switch), daemon=True)

    reader.start()
    consumer.start()

    stop_switch.wait()


if __name__ == "__main__":
    main()

reader.py

import cv2

def Reader(thing):
    cap = cv2.VideoCapture('rtsp_address')

    while True:
        ret, frame = cap.read()
        if ret:
            try:
                # discard possible previous (unprocessed) frame
                frames_queue.get_nowait()
            except queue.Empty:
                pass

            try:
                frames_queue.put(cv2.resize(frame, (1080, 720)), block=False)
            except:
                pass

consumer.py

import cv2

def Consumer(frames_queue, stop_switch):

    while True:
        frame = frames_queue.get()     ## get current camera frame from queue
        ## do something computationally intensive on frame
        cv2.imshow('output', cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))

        ## stop system when pressing 'q' key
        key = cv2.waitKey(1)
        if key==ord('q'):
            stop_switch.set()
            break

CodePudding user response:

Here's some ideas that I can think of...

...I'm not what I'd call an MP expert, but I have used MP and MT a lot in production code for parallel type image processing.

  1. First question after thinking about what you've said - Why are you using multiprocessing to begin with?

Let me explain this: your highest level problem could be stated as

  • "do something computationally intensive on the most recent frame"
  • caveat: the time taken for this "something" is much greater than the time to generate a frame.

In your current code you're simply discarding anything but the most recent frame in the producer, and as you state, generating the frames is much faster than processing them.

So, with that in mind, why use MP at all? Wouldn't it a) be a lot simpler and b) a lot faster to simply do (meta code):


while I_should_be_running:

    frame = get_latest_frame()
    processed = do_work_on_frame()
    save_if_needed(processed)

In this case you should likely use multi-threading to implement a way to set/unset I_should_be_running outside of this main loop, but the key point is that the work you're doing is all in a single process.

Honestly, in terms of KISS principle and personally struggled with the (for me at least) always present little wrinkles and complexities with trying to implement MP, based on your problem as you presented it, it might be a lot saner to just run it all as above in a single process...

  1. If you are using MP, and you only want the most recent frame to process, why use a queue at all?

The code, in the producer, is

  • empty current queue
  • get new frame
  • add the queue

So, why use a queue at all? Instead perhaps just use a single instance variable, say, most_recent_frame that the producer writes to.

You would need to setup locking around this between processes so that the consumer can lock it so that it can copy it to then subsequently do processing. But there are various tools in the MP libraries I believe to do this.

This would avoid a lot of overhead from doing un-needed work (ie processing time): creating unused object instances, pickling those objects in the queue (see https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Queue section around 'Pipes and Queues' - this is slow!) and garbage collection of those un-used objects after they are deleted if not used.

Also, you're using a queue with (in theory) a producer that in constantly adding things, but without a limit. While not a risk in your code (because you're removing all prior entries in the producer), this kind of approach has a potential huge risk of memory overflows if something goes wrong.

  1. Why use a connection object (ie a Queue) b/w your processes based on Pipe?

When you use queues in MP it is based on Pipe, which will create a pickled copy of the objects you're using it for. This can be very slow for large and/or complex objects.

Instead if you're concerned about maximum speed, then you could use a ctypes shared memory object (again see https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Queue).

This does need locking (which the MP code provides), but is significantly faster than queue based object pickle/copying for large/complex objects - for which your frame might possibly be.

Considering you're trying to generate frames at multiple 10's of times a second, this pickle/copy overhead would possibly be significant.

CodePudding user response:

I had created a solution to a multiprocessing video problem a while ago that I think can be very applicable to this exact scenario:

Use a multiprocessing.shared_memory backed array as a buffer, and continuously write video frames into it from the process that is reading frames. Then on another process, you can just make a copy of the current frame and go do some processing on it for a while. Any frames written during processing are just over-written by the next until you're ready for another frame.

Take note that the framerate of the video source is different from the framerate of the video display (limited by sleep(1/30) and cv2.waitKey(1000) respectively)

from multiprocessing import Process, Lock
from multiprocessing.shared_memory import SharedMemory
import cv2
from time import sleep
import numpy as np


def process_frames(shm, frame_shape, frame_dtype, frame_lock, exit_flag):
    #create numpy array from buffer
    frame_buffer = np.ndarray(frame_shape, buffer=shm.buf, dtype=frame_dtype)
    try:
        while True:
            if exit_flag.acquire(False): #try (without waiting) to get the lock, and if successful: exit
                break
            with frame_lock:
                frame = frame_buffer.copy() #don't want the data being updated while processing happens
            cv2.imshow('frame', frame)
            cv2.waitKey(1000) #this is needed for cv2 to update the gui, and waiting a long time simulates heavy processing
    except KeyboardInterrupt: #some systems propogate signals to child processes making exit_flag unnecessary. Others don't
        pass
    shm.close()
    print("child exiting")



def read_and_process_frames():
    vid_device = r"D:\Videos\movies\GhostintheShell.mp4" #a great movie
    
    #get the first frame to calculate size
    cap = cv2.VideoCapture(vid_device)
    success, frame = cap.read()
    if not success:
        raise Exception("error reading from video")
    
    #create the shared memory for the frame buffer
    frame_buffer_shm = SharedMemory(name="frame_buffer", create=True, size=frame.nbytes)
    frame_buffer = np.ndarray(frame.shape, buffer=frame_buffer_shm.buf, dtype=frame.dtype)
    
    frame_lock = Lock()
    exit_flag = Lock()
    exit_flag.acquire() #start in a locked state. When the reader process successfully acquires; that's the exit signal
    
    processing_process = Process(target=process_frames, args=(frame_buffer_shm, 
                                                              frame.shape, 
                                                              frame.dtype, 
                                                              frame_lock, 
                                                              exit_flag))
    processing_process.start()
    
    try: #use keyboardinterrupt to quit
        while True:
            with frame_lock:
                cap.read(frame_buffer) #read data into frame buffer
            sleep(1/30) #limit framerate-ish for video file (hitting actual framerate is more complicated than 1 line)
    except KeyboardInterrupt:
        print("exiting")
    
    exit_flag.release() #signal the child process to exit
    processing_process.join() #wait for child to exit
    
    #cleanup
    cap.release()
    frame_buffer_shm.close()
    frame_buffer_shm.unlink()


if __name__ == "__main__":
    read_and_process_frames()
  • Related