Home > database >  Flask consummer doesn't execute callback when consomming from rabbitMQ
Flask consummer doesn't execute callback when consomming from rabbitMQ

Time:10-08

So I have this problem. I want to use both Flask and RabbitMQ to do a microservice capable of doing some computation-heavy task. I basically wants something like the Remote procedure call (RPC) tutorial from the documentation, but with a REST Api overhead.

So I've come with that code, so far:

server.py

from flask import Flask

import sys
import os
import json

import pika
import uuid
import time

''' HEADERS = {'Content-type': 'audio/*', 'Accept': 'text/plain'}'''

class RPIclient(object):
        def __init__(self):
            self.connection = pika.BlockingConnection(
                pika.ConnectionParameters(host='rabbitmq'))
            self.channel = self.connection.channel()
            self.channel.basic_qos(prefetch_count=1)
            self.channel.exchange_declare(exchange='kaldi_expe', exchange_type='topic')

            # Create all the queue and bind them to the corresponding routing key
            self.channel.queue_declare('request', durable=True)
            result = self.channel.queue_declare('answer', durable=True)

            self.channel.queue_bind(exchange='kaldi_expe', queue='request', routing_key='kaldi_expe.web.request')
            self.channel.queue_bind(exchange='kaldi_expe', queue='answer', routing_key='kaldi_expe.kaldi.answer')
            self.callback_queue = result.method.queue

            self.channel.basic_consume(queue="answer", on_message_callback=self.on_response)

        def on_response(self, ch, method, props, body):
            print("from server, correlation id : "   str(props.correlation_id), file=sys.stderr)
            self.response = body
            ch.basic_ack(delivery_tag=method.delivery_tag)

        def call(self, n):
            print("Launched Call ")
            self.response = None
            self.corr_id = str(uuid.uuid4())
            self.channel.basic_publish(
                exchange='kaldi_expe',
                routing_key='kaldi_expe.web.request',
                properties=pika.BasicProperties(
                    correlation_id=self.corr_id,
                ),
                body=str(n))
            while self.response is None:
                self.connection.process_data_events()
            return int(2)

def flask_app():
    app = Flask("__name__")

    @app.route('/', methods=['GET'])
    def server_is_up():
        return 'server is up', 200

    @app.route('/add-job/<cmd>')
    def add(cmd):
        app.config['RPIclient'].call(10)
        return "Call RPI client",404

    return app

if __name__ == '__main__':
    print("Waiting for RabbitMq")
    time.sleep(20)
    rpiClient = RPIclient()
    app = flask_app()
    app.config['RPIclient'] = rpiClient
    print("Rabbit MQ is connected, starting server", file=sys.stderr)
    app.run(debug=True, threaded=False, host='0.0.0.0')

worker.py

import pika
import time
import sys

print(' [*] Waiting for RabbitMQ ...')
time.sleep(20)

print(' [*] Connecting to server ...')
channel = connection.channel()

print(' [*] Waiting for messages.')
def callback(ch, method, properties, body):
    print(" [x] Received %s" % body)
    print(" [x] Executing task ")
    print("from worker, correlation id : "   str(properties.correlation_id))
    ch.basic_publish(
                exchange='kaldi_expe',
                routing_key='kaldi_expe.kaldi.answer',
                properties=pika.BasicProperties(correlation_id = properties.correlation_id),
                body="response")
    print(" [x] Done")

    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='request', on_message_callback=callback)

channel.start_consuming()

Sadly, when I'm sending back a message (from the worker to the server), it seems that the server does consume the message, but never execute the callback (it shows the message as consummed, but not ACK on the rabbit mq interface. Also, print don't show).

I'm pretty lost, since the message seems to be consummed, but the callback seems to not be executed. Do you have any idea where it might come from ?

CodePudding user response:

you did attach the callback method on_response to the queue answer, but you never tell your server to start consuming the queues.

Looks like you are missing self.channel.start_consuming() at the end of your class initialization.

  • Related