Home > Software engineering >  Python process wait
Python process wait

Time:05-20

I have this application that shares work across multiple devices and starts processes on them, they often need to synchronize to each other.

  1. A web app opens an SSH connection to servers.
  2. It starts n Python instances per server.
  3. Then closes the SSH connection.

After that the only way each agent will communicate is with MQTT (Publish / subscribe on topics). The MQTT library I'm using is Paho-MQTT.

My current issue is that I can't find a good way to make them wait. I feel like while true : sleep is not a good way of doing it but I have no clue on how to do it better.

My current code looks like this:

def wake_up(self, client, userdata, message) -> None:
   self.wake_up_token  = 1

def wait(self) -> None:
   while self.wake_up_token == 0:
       sleep(self.wait_delay)
   self.wake_up_token -= 1

So far my only idea would be to find the best wait delay by dichotomy tho it will be time consuming so I'm hoping on a better solution.

Note:

def wait():
   while self.wake_up_token == 0:
      pass

is a terrible solution because the process will be constantly in competition to check the condition and it makes the system thousands of times slower.

Minimal example I guess: (1 agent, 1 scheduler)

from paho.mqtt.client import Client
from time import sleep

class Agent:

    def __init__(self, broker_ip: str, client_id: str):
        self.client: Client = Client(client_id=client_id)
        self.client.username_pw_set(username="something", password="veryverysecured")
        self.client.connect(host=broker_ip)
        self.client.subscribe("wake_up_topic")
        self.client.message_callback_add("wake_up_topic", self.wake_up)
        self.client.loop_start()

        self.wake_up_token = 0
        
    def wake_up(self, client, userdata, message) -> None:
        self.wake_up_token  = 1
    
    def wait(self) -> None:
        while self.wake_up_token == 0:
            sleep(self.wait_delay)
        self.wake_up_token -= 1
    
    def run(self):
        while True:
            # do something
            self.client.publish(topic="agent/action_done", message="")
            self.wait()
            # do something

class Scheduler:

    def __init__(self, broker_ip: str):
        self.client: Client = Client(client_id="scheduler")
        self.client.username_pw_set(username="something", password="veryverysecured")
        self.client.connect(host=broker_ip)
        self.client.subscribe("agent/action_done")
        self.client.message_callback_add("agent/action_done", self.wake_up)
        self.client.loop_start()

        self.wake_up_token = 0

    def wake_up(self, client, userdata, message) -> None:
        self.wake_up_token  = 1
    
    def wait(self) -> None:
        while self.wake_up_token == 0:
            sleep(self.wait_delay)
        self.wake_up_token -= 1
    
    def run(self):
        while True:

            #agents are doing their thing

            self.wait()
            self.client.publish(topic="wake_up_topic", message="")

I don't know if it is useful, but the goal of the whole project is to build a MAS frameworks that shares the agents across multiple servers.

CodePudding user response:

I think a standard Semaphore is most useful in this case. I can't test this, since I don't have an MQTT client/server running, but it should be something like this:

import threading

class Scheduler:

    def __init__(self, broker_ip: str):
        # your init code here (unchanged), but add this line:
        self.semaphore = threading.Semaphore(0)

    def wake_up(self, client, userdata, message) -> None:
        self.semaphore.release()
    
    def wait(self) -> None:
        self.semaphore.acquire()
  • Related