In my python program, I am using a multi-thread producer/consumer model like the one described in https://realpython.com/intro-to-python-threading/. It has 2 threads, they are communicating using 'queue'. The consumer thread is waiting the message from the queue and it calls do_somthing_to_process_message() to process the message.
Here is the code:
> def producer(queue, event):
> """Pretend we're getting a number from the network."""
> while not event.is_set():
> message = random.randint(1, 101)
> logging.info("Producer got message: %s", message)
> queue.put(message)
>
> logging.info("Producer received event. Exiting")
>
> def consumer(queue, event):
> """Pretend we're saving a number in the database."""
> while not event.is_set() or not queue.empty():
> message = queue.get()
> logging.info(
> "Consumer storing message: %s (size=%d)", message, queue.qsize()
> )
> do_somthing_to_process_message(message)
> logging.info("Consumer received event. Exiting")
>
> if __name__ == "__main__":
> format = "%(asctime)s: %(message)s"
> logging.basicConfig(format=format, level=logging.INFO,
> datefmt="%H:%M:%S")
>
> pipeline = queue.Queue(maxsize=10)
> event = threading.Event()
> with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
> executor.submit(producer, pipeline, event)
> executor.submit(consumer, pipeline, event)
>
> time.sleep(0.1)
> logging.info("Main: about to set event")
> event.set()
My question is how can I create a new async function consumer_receives_a_message(), so that my code can be like:
async aNewFunction(){
message = await consumer_receives_a_message();
do_somthing_to_process_message(message)
}
The reason for this change is I encapsulate the producer/consumer into an library, and the caller of that library can just await() when there is a message and it can process the message in whatever way it wants.
CodePudding user response:
I recommend solving this by encapsulating the producer/consumer in a class. Here's how I would modify your current code:
class LibraryClass:
def producer(self, queue, event):
"""Pretend we're getting a number from the network."""
while not event.is_set():
message = random.randint(1, 101)
logging.info("Producer got message: %s", message)
queue.put(message)
logging.info("Producer received event. Exiting")
def consumer(self, queue, event):
"""Pretend we're saving a number in the database."""
while not event.is_set() or not queue.empty():
message = queue.get()
logging.info(
"Consumer storing message: %s (size=%d)", message, queue.qsize()
)
self.do_somthing_to_process_message(message)
logging.info("Consumer received event. Exiting")
def do_something_to_process_message(self, message):
logging.info("Message Processing Not Implemented")
def __init__(self):
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO,
datefmt="%H:%M:%S")
pipeline = queue.Queue(maxsize=10)
event = threading.Event()
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
executor.submit(self.producer, pipeline, event)
executor.submit(self.consumer, pipeline, event)
time.sleep(0.1)
logging.info("Main: about to set event")
event.set()
Then the user of your library could do this to define their own custom message processing.
from library import LibraryClass
class ClientClass(LibraryClass):
def __init__(self):
super().__init__()
def do_something_to_process_message(self, message):
message *= 10 # my custom processing is multiplying by 10
print(message)