Home > Software design >  How do you queue messages locally until there is internet?
How do you queue messages locally until there is internet?

Time:09-29

I have an IOT device that sends messages to our server via amazon SQS (boto3 library). Our device is connected to the internet via a 3G USB dongle.

In my Python code there is a try/except block to catch the SQS message if it cannot be sent. We have 99% uptime with our dongle, but I need to find a solution for the 1% downtime.

How can I create a local queue pythonically that sends the message when internet is available again? Would I need to implement rabbitmq or maybe just create a thread with a 5-second loop that ends when the message sends?

def send_message(sqs):
    try:
        msg_json = json.dumps({"function": hello, "userid": userid})
        sqs.send_message(
            QueueUrl=que,
            DelaySeconds=1,
            MessageAttributes={},
            MessageBody=(msg_json)
        )
    except Exception as e:
        print("Could not send")
        print(e)

CodePudding user response:

Do you have to send the message immediately after the connection is restored or it can wait until the next message?

Retrying on the same thread that called send_message may halt your whole application. On the other hand, waiting for another call to send_message may delay the previous one for too long.

I am in favor of sending the message as soon as possible, and that's why I would rather send all messages asynchronously, like this:

import time
import threading

class Messenger:
  def __init__(self, queue_url) -> None:
      self.msg_queue = []
      self.queue_url = queue_url
      self.running = True
      self.thread = threading.Thread(self.send_worker)

  def stop(self):
    self.running = False
    self.thread.join()

  def send_message(self, msg):
    if self.running:
      self.msg_queue.append(msg)

  def send_worker(self):
    while self.running or len(self.queue) > 0:

      msg = self.queue.pop()

      # retry forever      
      while not self.send_message_impl(msg):
        time.sleep(1)

      # sleep a little to prevent high cpu usage
      # you can use condition variables if you want
      if self.running and len(self.queue) == 0:
        time.sleep(1)

  def send_message_impl(sqs):
      try:
          msg_json = json.dumps(msg)
          sqs.send_message(
              QueueUrl=self.queue_url,
              DelaySeconds=1,
              MessageAttributes={},
              MessageBody=(msg_json)
          )
          return True
      except Exception as e:
          print("Could not send")
          print(e)
          return False

CodePudding user response:

you can make this a part of an object and store your messages in a list, like:

class Messenger:
    def __init__(self):
        self.message_list = list()

    def send(self,sqs,hello,userid):
        try:
            if len(self.message_list)>0:
                
                for msg_json in self.message_list:
                   sqs.send_message(
                       QueueUrl=que,
                       DelaySeconds=1,
                       MessageAttributes={},
                       MessageBody=(msg_json)
                    )
                self.message_list = []

            msg_json = json.dumps({"function": hello, "userid": userid})
            sqs.send_message(
                QueueUrl=que,
                DelaySeconds=1,
                MessageAttributes={},
                MessageBody=(msg_json)
            )
    except Exception as e:
        self.message_list.append(msg_json)
        #pop message at position 0, if there is too much messages to be send yet
        if len(self.message_list)>1000:
            self.message_list.pop(0)
        print("Could not send")
        print(e)

if __name__ == "__main__":
    messenger = Messenger()

    while True:
        #your stuff that acquires hello,userid and sqs
        messenger.send(sqs,hello,userid)

be aware that you can run into an overflow of memory if you stay without internet for a long time.

CodePudding user response:

If the downtime is <5 seconds why not wrap the whole functionality in a while loop as such (Python's Threading can be leveraged if worried about causing a delay for newer messages)-

import time
def send_message(sqs):
    Message = True
    Timer = 0
    while Message:
        try:
            msg_json = json.dumps({"function": hello, "userid": userid})
            sqs.send_message(
                QueueUrl=que,
                DelaySeconds=1,
                MessageAttributes={},
                MessageBody=(msg_json)
            )
            Message = False # message sent successfully exit while loop

         except Exception as e:
            if Timer<6: # set a time limit, i.e. the downtime
                time.sleep(2) # specify the sleep time before trying again
                Timer =2
                print("Could not send")
                print(e)
            else:
                break
  • Related