I have this application that shares work across multiple devices and starts processes on them, they often need to synchronize to each other.
- A web app opens an SSH connection to servers.
- It starts n Python instances per server.
- Then closes the SSH connection.
After that the only way each agent will communicate is with MQTT (Publish / subscribe on topics). The MQTT library I'm using is Paho-MQTT.
My current issue is that I can't find a good way to make them wait. I feel like while true : sleep
is not a good way of doing it but I have no clue on how to do it better.
My current code looks like this:
def wake_up(self, client, userdata, message) -> None:
self.wake_up_token = 1
def wait(self) -> None:
while self.wake_up_token == 0:
sleep(self.wait_delay)
self.wake_up_token -= 1
So far my only idea would be to find the best wait delay by dichotomy tho it will be time consuming so I'm hoping on a better solution.
Note:
def wait():
while self.wake_up_token == 0:
pass
is a terrible solution because the process will be constantly in competition to check the condition and it makes the system thousands of times slower.
Minimal example I guess: (1 agent, 1 scheduler)
from paho.mqtt.client import Client
from time import sleep
class Agent:
def __init__(self, broker_ip: str, client_id: str):
self.client: Client = Client(client_id=client_id)
self.client.username_pw_set(username="something", password="veryverysecured")
self.client.connect(host=broker_ip)
self.client.subscribe("wake_up_topic")
self.client.message_callback_add("wake_up_topic", self.wake_up)
self.client.loop_start()
self.wake_up_token = 0
def wake_up(self, client, userdata, message) -> None:
self.wake_up_token = 1
def wait(self) -> None:
while self.wake_up_token == 0:
sleep(self.wait_delay)
self.wake_up_token -= 1
def run(self):
while True:
# do something
self.client.publish(topic="agent/action_done", message="")
self.wait()
# do something
class Scheduler:
def __init__(self, broker_ip: str):
self.client: Client = Client(client_id="scheduler")
self.client.username_pw_set(username="something", password="veryverysecured")
self.client.connect(host=broker_ip)
self.client.subscribe("agent/action_done")
self.client.message_callback_add("agent/action_done", self.wake_up)
self.client.loop_start()
self.wake_up_token = 0
def wake_up(self, client, userdata, message) -> None:
self.wake_up_token = 1
def wait(self) -> None:
while self.wake_up_token == 0:
sleep(self.wait_delay)
self.wake_up_token -= 1
def run(self):
while True:
#agents are doing their thing
self.wait()
self.client.publish(topic="wake_up_topic", message="")
I don't know if it is useful, but the goal of the whole project is to build a MAS frameworks that shares the agents across multiple servers.
CodePudding user response:
I think a standard Semaphore is most useful in this case. I can't test this, since I don't have an MQTT client/server running, but it should be something like this:
import threading
class Scheduler:
def __init__(self, broker_ip: str):
# your init code here (unchanged), but add this line:
self.semaphore = threading.Semaphore(0)
def wake_up(self, client, userdata, message) -> None:
self.semaphore.release()
def wait(self) -> None:
self.semaphore.acquire()