Home > Mobile >  How to separately start and stop multiprocessing processes in Python?
How to separately start and stop multiprocessing processes in Python?

Time:09-23

I use a dedicated Python (3.8) library to control a motor drive via a USB port.

The Python library provided by the motor control drive manufacturers (ODrive) allows a single Python process to control one or more drives.

However, I would like to run 3 processes, each controlling 1 drive.

After researching options (I first considered virtual machines, Docker containers, and multi-threading) I began believing that the easiest way to do so would be to use multiprocessing.

My problem is that I would then need a way to manage (i.e., start, monitor, and stop independently) multiple processes. The practical reason behind it is that motors are connected to different setups. Each setup must be able to be stopped and restarted separate if malfunctioning, for instance, but other running setups should not be affected by this action.

After reading around the internet and Stack Overflow, I now understand how to create a Pool of processing, how to associate processes with processor cores, how to start a pool of processes, and queuing/joining them (the latter not being needed for me).

What I don't know is how to manage them independently. How can I separately start/stop different processes without affecting the execution of the others? Are there libraries to manage them (perhaps even with a GUI)?

CodePudding user response:

You can create multiple multriprocessing.Process instances manually like this:

def my_func(a, b):
    pass

p = multiprocessing.Process(target=my_func, args=(100, 200)
p.start()

and manage them using multiprocessing primitives Queue, Event, Condition etc. Please refer to the official documentation for details: https://docs.python.org/3/library/multiprocessing.html

In the following example multiple processes are started and stopped independently. Event is used to determine when to stop a process. Queue is used for results passing from the child processes to the main process.

import multiprocessing
import queue
import random
import time


def worker_process(
    process_id: int,
    results_queue: multiprocessing.Queue,
    to_stop: multiprocessing.Event,
):

    print(f"Process {process_id} is started")
    while not to_stop.is_set():
        print(f"Process {process_id} is working")
        time.sleep(0.5)
        result = random.random()
        results_queue.put((process_id, result))

    print(f"Process {process_id} exited")


process_pool = []
result_queue = multiprocessing.Queue()
while True:
    if random.random() < 0.3:
        # staring a new process
        process_id = random.randint(0, 10_000)
        to_stop = multiprocessing.Event()
        p = multiprocessing.Process(
            target=worker_process, args=(process_id, result_queue, to_stop)
        )
        p.start()
        process_pool.append((p, to_stop))

    if random.random() < 0.2:
        # closing a random process
        if process_pool:
            process, to_stop = process_pool.pop(
                random.randint(0, len(process_pool) - 1)
            )
            to_stop.set()
            process.join()

    try:
        p_id, result = result_queue.get_nowait()
        print(f"Completed: process_id={p_id} result={result}")
    except queue.Empty:
        pass

    time.sleep(1)

CodePudding user response:

I'd probably do something like this:

import random
import time
from multiprocessing import Process, Queue


class MotorProcess:
    def __init__(self, name, com_related_params):
        self.name = name
        # Made up some parameters relating to communication
        self._params = com_related_params
        self._command_queue = Queue()
        self._status_queue = Queue()
        self._process = None

    def start(self):
        if self._process and self._process.is_alive():
            return
        self._process = Process(target=self.run_processing,
                                args=(self._command_queue, self._status_queue,
                                      self._params))
        self._process.start()

    @staticmethod
    def run_processing(command_queue, status_queue, params):
        while True:
            # Check for commands
            if not command_queue.empty():
                msg = command_queue.get(block=True, timeout=0.05)
                if msg == "stop motor":
                    status_queue.put("Stopping motor")
                elif msg == "exit":
                    return
                elif msg.startswith("move"):
                    status_queue.put("moving motor to blah")
                    # TODO: msg parsing and move motor
                else:
                    status_queue.put("unknown command")

            # Update status
            # TODO: query motor status
            status_queue.put(f"Motor is {random.randint(0, 100)}")
            time.sleep(0.5)

    def is_alive(self):
        if self._process and self._process.is_alive():
            return True
        return False

    def get_status(self):
        if not self.is_alive():
            return ["not running"]
        # Empty the queue
        recent = []
        while not self._status_queue.empty():
            recent.append(self._status_queue.get(False))
        return recent

    def stop_process(self):
        if not self.is_alive():
            return
        self._command_queue.put("exit")
        # Empty the stats queue otherwise it could potentially stop
        # the process from closing.
        while not self._status_queue.empty():
            self._status_queue.get()

        self._process.join()

    def send_command(self, command):
        self._command_queue.put(command)


if __name__ == "__main__":
    processes = [MotorProcess("1", None), MotorProcess("2", None)]

    while True:
        cmd = input()
        if cmd == "start 1":
            processes[0].start()
        elif cmd == "move 1 to 100":
            processes[0].send_command("move to 100")
        elif cmd == "exit 1":
            processes[0].stop_process()
        else:
            for n, p in enumerate(processes):
                print(f"motor {n   1}", end="\n\t")
                print("\n\t".join(p.get_status()))

Not production ready (e.g. no exception handling, no proper command parsing, etc.) but shows the idea. Shout if there are any problems :D

  • Related