I have an IOT device that sends messages to our server via amazon SQS (boto3 library). Our device is connected to the internet via a 3G USB dongle.
In my Python code there is a try
/except
block to catch the SQS message if it cannot be sent. We have 99% uptime with our dongle, but I need to find a solution for the 1% downtime.
How can I create a local queue pythonically that sends the message when internet is available again? Would I need to implement rabbitmq or maybe just create a thread with a 5-second loop that ends when the message sends?
def send_message(sqs):
try:
msg_json = json.dumps({"function": hello, "userid": userid})
sqs.send_message(
QueueUrl=que,
DelaySeconds=1,
MessageAttributes={},
MessageBody=(msg_json)
)
except Exception as e:
print("Could not send")
print(e)
CodePudding user response:
Do you have to send the message immediately after the connection is restored or it can wait until the next message?
Retrying on the same thread that called send_message
may halt your whole application. On the other hand, waiting for another call to send_message
may delay the previous one for too long.
I am in favor of sending the message as soon as possible, and that's why I would rather send all messages asynchronously, like this:
import time
import threading
class Messenger:
def __init__(self, queue_url) -> None:
self.msg_queue = []
self.queue_url = queue_url
self.running = True
self.thread = threading.Thread(self.send_worker)
def stop(self):
self.running = False
self.thread.join()
def send_message(self, msg):
if self.running:
self.msg_queue.append(msg)
def send_worker(self):
while self.running or len(self.queue) > 0:
msg = self.queue.pop()
# retry forever
while not self.send_message_impl(msg):
time.sleep(1)
# sleep a little to prevent high cpu usage
# you can use condition variables if you want
if self.running and len(self.queue) == 0:
time.sleep(1)
def send_message_impl(sqs):
try:
msg_json = json.dumps(msg)
sqs.send_message(
QueueUrl=self.queue_url,
DelaySeconds=1,
MessageAttributes={},
MessageBody=(msg_json)
)
return True
except Exception as e:
print("Could not send")
print(e)
return False
CodePudding user response:
you can make this a part of an object and store your messages in a list, like:
class Messenger:
def __init__(self):
self.message_list = list()
def send(self,sqs,hello,userid):
try:
if len(self.message_list)>0:
for msg_json in self.message_list:
sqs.send_message(
QueueUrl=que,
DelaySeconds=1,
MessageAttributes={},
MessageBody=(msg_json)
)
self.message_list = []
msg_json = json.dumps({"function": hello, "userid": userid})
sqs.send_message(
QueueUrl=que,
DelaySeconds=1,
MessageAttributes={},
MessageBody=(msg_json)
)
except Exception as e:
self.message_list.append(msg_json)
#pop message at position 0, if there is too much messages to be send yet
if len(self.message_list)>1000:
self.message_list.pop(0)
print("Could not send")
print(e)
if __name__ == "__main__":
messenger = Messenger()
while True:
#your stuff that acquires hello,userid and sqs
messenger.send(sqs,hello,userid)
be aware that you can run into an overflow of memory if you stay without internet for a long time.
CodePudding user response:
If the downtime is <5 seconds why not wrap the whole functionality in a while loop as such (Python's Threading can be leveraged if worried about causing a delay for newer messages)-
import time
def send_message(sqs):
Message = True
Timer = 0
while Message:
try:
msg_json = json.dumps({"function": hello, "userid": userid})
sqs.send_message(
QueueUrl=que,
DelaySeconds=1,
MessageAttributes={},
MessageBody=(msg_json)
)
Message = False # message sent successfully exit while loop
except Exception as e:
if Timer<6: # set a time limit, i.e. the downtime
time.sleep(2) # specify the sleep time before trying again
Timer =2
print("Could not send")
print(e)
else:
break