Home > database >  How can I convert a multi-thread programming model in python to async/await model?
How can I convert a multi-thread programming model in python to async/await model?

Time:10-10

In my python program, I am using a multi-thread producer/consumer model like the one described in https://realpython.com/intro-to-python-threading/. It has 2 threads, they are communicating using 'queue'. The consumer thread is waiting the message from the queue and it calls do_somthing_to_process_message() to process the message.

Here is the code:

> def producer(queue, event):
>     """Pretend we're getting a number from the network."""
>     while not event.is_set():
>         message = random.randint(1, 101)
>         logging.info("Producer got message: %s", message)
>         queue.put(message)
> 
>     logging.info("Producer received event. Exiting")
> 
> def consumer(queue, event):
>     """Pretend we're saving a number in the database."""
>     while not event.is_set() or not queue.empty():
>         message = queue.get()
>         logging.info(
>             "Consumer storing message: %s (size=%d)", message, queue.qsize()
>         )
>     do_somthing_to_process_message(message)
>     logging.info("Consumer received event. Exiting")
> 
> if __name__ == "__main__":
>     format = "%(asctime)s: %(message)s"
>     logging.basicConfig(format=format, level=logging.INFO,
>                         datefmt="%H:%M:%S")
> 
>     pipeline = queue.Queue(maxsize=10)
>     event = threading.Event()
>     with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
>         executor.submit(producer, pipeline, event)
>         executor.submit(consumer, pipeline, event)
> 
>         time.sleep(0.1)
>         logging.info("Main: about to set event")
>         event.set()

My question is how can I create a new async function consumer_receives_a_message(), so that my code can be like:

async aNewFunction(){
    message = await consumer_receives_a_message();
    do_somthing_to_process_message(message)
}

The reason for this change is I encapsulate the producer/consumer into an library, and the caller of that library can just await() when there is a message and it can process the message in whatever way it wants.

CodePudding user response:

I recommend solving this by encapsulating the producer/consumer in a class. Here's how I would modify your current code:

class LibraryClass:
    def producer(self, queue, event):
        """Pretend we're getting a number from the network."""
        while not event.is_set():
            message = random.randint(1, 101)
            logging.info("Producer got message: %s", message)
            queue.put(message)

        logging.info("Producer received event. Exiting")

    def consumer(self, queue, event):
        """Pretend we're saving a number in the database."""
        while not event.is_set() or not queue.empty():
            message = queue.get()
            logging.info(
                "Consumer storing message: %s (size=%d)", message, queue.qsize()
            )
        self.do_somthing_to_process_message(message)
        logging.info("Consumer received event. Exiting")

    def do_something_to_process_message(self, message):
        logging.info("Message Processing Not Implemented")

    def __init__(self):
        format = "%(asctime)s: %(message)s"
        logging.basicConfig(format=format, level=logging.INFO,
                            datefmt="%H:%M:%S")

        pipeline = queue.Queue(maxsize=10)
        event = threading.Event()
        with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
            executor.submit(self.producer, pipeline, event)
            executor.submit(self.consumer, pipeline, event)

            time.sleep(0.1)
            logging.info("Main: about to set event")
            event.set()

Then the user of your library could do this to define their own custom message processing.

from library import LibraryClass


class ClientClass(LibraryClass):
    def __init__(self):
        super().__init__()

    def do_something_to_process_message(self, message):
        message *= 10 # my custom processing is multiplying by 10
        print(message)
  • Related