Home > Software design >  What is the best mechanism to send data through queues in a multithreaded program?
What is the best mechanism to send data through queues in a multithreaded program?

Time:05-15

I am making a multi-threaded application, where a main process sends messages to the appropriate thread through a queue. My doubt is in the part of the thread: the solution that I have found listens constantly (up to a limit, that's why I have my class Clock and its method "isRunnig", that returns True if time is not expired yet) and if there are attempts in which no data arrives, then I catch the exception and simply continue.

I put first a simplification of the code of the main process:

    def callUpdate (self, update : Update): #Update is a class that includes the correct ID of its thread and the data to process by the thread.
        find = False
        wrapp : _Wrapper = None
        for current in self.threads:
            if (type(current) is not _Wrapper): #_Wrapper is a class that includes the thread
                continue
            if not current.theThread.is_alive() :
                #Here I save some data, and I remove the thread from
                self.threads.remove(current) 
                continue

            if (current.id == update.id):
                wrapp = current
                find = True
                break
        #Here I do some things and then, I create a new thread if not found and send first message (the update itself in this first send), or if its found and working (alive), I just send the data to the thread. Wrapper creates a new queue and saves the thread to send more data later if needed.
        if (not find):
            wrapp = _Wrapper(data)
            self.threads.append(wrapp)        
            wrapp.queue.put(update)
            bot.start()
        else:
            #Thread already working and I send the update
            wrapp.queue.put(update)

Well, now I include a simplification of the thread part, which is what worries me, because it seems a bit "sloppy". Notice that I read the message queue with a 1 second pause. I have a clock class that simply returns if the indicated time has passed (in this case, 120 seconds)

def process (self): #This process is part of the class that heritate from Thread (class ProcessThread (threading.Thread):
  clock = Clock(seconds=120)
  while (clock.isRunning()):
            update: Update = self.getUpdateFromQueue(seconds=1)
            if (update is None) : continue
            #At this point, the message update is correct and I process the data. Once the clock is finnish, I finnish the process
  return

The problem is that sometimes the execution of the program slows down a lot, with few threads or with many threads (it seems that it has nothing to do with it); I have also tried to reduce the reread time of the queue (because if there are many requests it seems to cause problems). I have a feeling it's hacky, can anyone suggest me any other option to receive the queued data in multithread?

Thank you

---------- EDIT ---------- Sorry, I didnt include the process to get the data from queue:

    #Get data from queue, maximum wait time in seconds.
    def getUpdateFromQueue (self, seconds=10):
        max = datetime.datetime.now()   datetime.timedelta(seconds=seconds)
        current = datetime.datetime.now()
        while (current < max):
            try:
                data : Update = self.queue.get(timeout=0.01)                
                return data
            except Empty:
                current = datetime.datetime.now()
                continue
        return None

CodePudding user response:

Your code is spinning and waiting for no reason, which is naturally going to hurt performance; you should not be doing this in your own code at all. Instead use the timeout functionality in queue.Queue to handle your timeouts.

For example, getUpdateFromQueue doesn't need to loop and look at the wall time in between short-timed calls to queue.get; it can just pass the seconds maximum directly to queue.get:

def getUpdateFromQueue(self, seconds=10):
    try:
        return self.queue.get(timeout=seconds)
    except Empty:
        return None

But you don't need this to be its own function in the first place. Instead of:

def process(self):
    clock = Clock(seconds=120)
    while (clock.isRunning()):
        update: Update = self.getUpdateFromQueue(seconds=1)
        if (update is None) : continue
    return

you can just use queue.get directly with the overall maximum timeout that you're trying to enforce using your Clock class:

def process(self):
    try:
        return self.queue.get(timeout=120)
    except Empty:
        return None

That should have the same effect (return a piece of data, waiting for a maximum of 120 seconds before returning None instead), without two nested while loops that are constantly spinning the CPU (and both doing the same thing, just at different resolutions).

If you need to process multiple messages, you just need a single loop where you adjust the timeout on each get() to reflect the overall deadline. (I'm using time.monotonic() here because it by definition can't get thrown off by changes in the system clock.)

from queue import Empty
from time import monotonic


def process(self, data):
    # do whatever you need to do with one piece of data
    pass

def process_messages_with_timeout(self, timeout=120):
    deadline = monotonic()   timeout
    while True:
        try:
            self.process(self.queue.get(timeout=deadline - monotonic()))
        except Empty:
            break

The important thing is that you should only ever need to make one call to get() per item you actually want to get, with the actual timeout; there's no point in doing a get() with a shorter timeout than you want and then adding extra logic to retry within the real timeout. Adding extra loops within loops serves no purpose.

  • Related