So I have this problem. I want to use both Flask
and RabbitMQ
to do a microservice capable of doing some computation-heavy task. I basically wants something like the
Remote procedure call (RPC) tutorial from the documentation, but with a REST Api overhead.
So I've come with that code, so far:
server.py
from flask import Flask
import sys
import os
import json
import pika
import uuid
import time
''' HEADERS = {'Content-type': 'audio/*', 'Accept': 'text/plain'}'''
class RPIclient(object):
def __init__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host='rabbitmq'))
self.channel = self.connection.channel()
self.channel.basic_qos(prefetch_count=1)
self.channel.exchange_declare(exchange='kaldi_expe', exchange_type='topic')
# Create all the queue and bind them to the corresponding routing key
self.channel.queue_declare('request', durable=True)
result = self.channel.queue_declare('answer', durable=True)
self.channel.queue_bind(exchange='kaldi_expe', queue='request', routing_key='kaldi_expe.web.request')
self.channel.queue_bind(exchange='kaldi_expe', queue='answer', routing_key='kaldi_expe.kaldi.answer')
self.callback_queue = result.method.queue
self.channel.basic_consume(queue="answer", on_message_callback=self.on_response)
def on_response(self, ch, method, props, body):
print("from server, correlation id : " str(props.correlation_id), file=sys.stderr)
self.response = body
ch.basic_ack(delivery_tag=method.delivery_tag)
def call(self, n):
print("Launched Call ")
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(
exchange='kaldi_expe',
routing_key='kaldi_expe.web.request',
properties=pika.BasicProperties(
correlation_id=self.corr_id,
),
body=str(n))
while self.response is None:
self.connection.process_data_events()
return int(2)
def flask_app():
app = Flask("__name__")
@app.route('/', methods=['GET'])
def server_is_up():
return 'server is up', 200
@app.route('/add-job/<cmd>')
def add(cmd):
app.config['RPIclient'].call(10)
return "Call RPI client",404
return app
if __name__ == '__main__':
print("Waiting for RabbitMq")
time.sleep(20)
rpiClient = RPIclient()
app = flask_app()
app.config['RPIclient'] = rpiClient
print("Rabbit MQ is connected, starting server", file=sys.stderr)
app.run(debug=True, threaded=False, host='0.0.0.0')
worker.py
import pika
import time
import sys
print(' [*] Waiting for RabbitMQ ...')
time.sleep(20)
print(' [*] Connecting to server ...')
channel = connection.channel()
print(' [*] Waiting for messages.')
def callback(ch, method, properties, body):
print(" [x] Received %s" % body)
print(" [x] Executing task ")
print("from worker, correlation id : " str(properties.correlation_id))
ch.basic_publish(
exchange='kaldi_expe',
routing_key='kaldi_expe.kaldi.answer',
properties=pika.BasicProperties(correlation_id = properties.correlation_id),
body="response")
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='request', on_message_callback=callback)
channel.start_consuming()
Sadly, when I'm sending back a message (from the worker to the server), it seems that the server does consume the message, but never execute the callback (it shows the message as consummed, but not ACK on the rabbit mq interface. Also, print don't show).
I'm pretty lost, since the message seems to be consummed, but the callback seems to not be executed. Do you have any idea where it might come from ?
CodePudding user response:
you did attach the callback method on_response
to the queue answer
, but you never tell your server to start consuming the queues.
Looks like you are missing self.channel.start_consuming()
at the end of your class initialization.