Home > Enterprise >  How to increase processing speed of python code through parallelization
How to increase processing speed of python code through parallelization

Time:02-01

I currently have a sequential code with the following parts (all parts are properly encapsulated and isolated in class methods and so on):

  1. Frame capture from network stream with opencv VideoCapture
  2. Processing of the image with yolov7 through pytorch (with cuda)
  3. Classical processing of the yolov7 output
  4. Extra heavy classical processing done every X frames (X will be 30 or 60)

Because this will need to run in real-time (some little constant latency is allowed, but not a growing one), something needs to be done since it currently runs at 15 fp (already ignoring odd frames). Time profiling shows that the most time consuming processes are (2) and (4) (no surprise).

When I started looking up info for (1) I learned about the threading module which seemed promising and popped up in a lot of stackoverflow answers to increase code speed when image capture from cameras was involved. This lead me to see salvation in this module (because of missconceptions of parallelization) until I have just learnt that it still executes one thread thanks to the GIL thing. I am also aware of the existance of asyncio, multithreading and concurrent.futures.ProcessPoolExecutor. I have also read this post on threading. Multicore processing is available.

Aim is to have (1) capture frames into a queue. (2) takes the frame when there's one available and processes it. As soon as it finishes the processing, give the output to (3) and read the next available frame in the queue to keep processing while (3) and (4) are executed too. (3) takes (2) output and processes it super fast and waits until there's output from (2) and repeat. Finally, every X frames, (4) will read the outputs generated by (3) until that point and perform some heavy calculations.

If I have understood well, I should use multiprocessing instead of multithreading since it's a rather intensive calculation problem (apart from the I/O on (1)).

So the questions really are:

  • For (1), since this is I/O related, is threading combined with queue a good way to go?
  • For (2), (3) and (4), what is the way to proceed? I need them to run at the same time, especially (2) and (4), since (3) really runs at nearly 300 fps
  • For (2), is there a way to process two frames at the same time? For example processing even and odd frames at the same time. This is a critical point right now to get to real-time processing.
  • How difficult is all this? I'm not really an expert in this topic (I'm actually a physicist) so I don't know if I'm getting myself in too much of a slippery slope here.

I just need someone who knows about all this mess to point me in the right direction, so don't hesitate to add some references. Thank you very much in advance!

CodePudding user response:

It is difficult to provide an authoritative answer in a question like this. I can provide with some experience when working with a similar problem (image processing on a Raspberry Pi on a Drone).

In my experience, multiprocessing is the way to go. Because of the PIL limitations, if you are pushing the limits of a single core, threading will do very little. You would use threading in an environment where you have blocking, but not resource intensive operations, such as IO/network. Also, you want to avoid mixing Threads and Processes, especially if you are not confident you know what you are doing.

You essentially want a pipeline of processes, multiple stages, each handling a specific task, running in parallel. In this case, your per frame processing time goes from the sum of your individual stage processing times to the max of your per-stage processing times.

Practically, you want a Process per stage, with a multiprocessing Queue between. You want to test each stage separate, passing in/out items in a controlled manner first, then try to integrate them. It is not trivial, but you can do it.

As to having two processes for (2) in parallel, while possible, there are some things to consider:

  • Is CUDA capable of handling that. Because you are passing some of the processing over to your GPU, it may become limited by that, not the CPU. Thus, sending twice the amount of work to it may just do nothing.
  • You want make sure you have a robust way of re-ordering your frames. You'd most likely end up with two Queues published to by (1), and two Queues for (3) to consume from. You need to make sure (3) is consuming items for the right queue, and emmits them ordered. You may want to pass along some metadata to help with this.

CodePudding user response:

As you mentioned, Python is limited by the GIL which means a python process can only use one CPU. For I/O, threading works well but you will not be able to use multiple CPU cores if you have them available.

However, there is the wonderful multiprocessing module in python that allows you to create a new process. This takes more time to create than a thread but can run on a different CPU core.

There are plenty of examples of this, for example this (see the python module page linked above) but here one using a Queue from DigitalOcean

from multiprocessing import Lock, Process, Queue, current_process
import time
import queue # imported for using queue.Empty exception


def do_job(tasks_to_accomplish, tasks_that_are_done):
    while True:
        try:
            '''
                try to get task from the queue. get_nowait() function will 
                raise queue.Empty exception if the queue is empty. 
                queue(False) function would do the same task also.
            '''
            task = tasks_to_accomplish.get_nowait()
        except queue.Empty:

            break
        else:
            '''
                if no exception has been raised, add the task completion 
                message to task_that_are_done queue
            '''
            print(task)
            tasks_that_are_done.put(task   ' is done by '   current_process().name)
            time.sleep(.5)
    return True


def main():
    number_of_task = 10
    number_of_processes = 4
    tasks_to_accomplish = Queue()
    tasks_that_are_done = Queue()
    processes = []

    for i in range(number_of_task):
        tasks_to_accomplish.put("Task no "   str(i))

    # creating processes
    for w in range(number_of_processes):
        p = Process(target=do_job, args=(tasks_to_accomplish, tasks_that_are_done))
        processes.append(p)
        p.start()

    # completing process
    for p in processes:
        p.join()

    # print the output
    while not tasks_that_are_done.empty():
        print(tasks_that_are_done.get())

    return True


if __name__ == '__main__':
    main()
  • Related