Home > Software design >  Cannot get celery to run task with Flask-SockeIO and eventlet monkey patching
Cannot get celery to run task with Flask-SockeIO and eventlet monkey patching

Time:12-24

Update 2:

The solution is in how monkey patching actually gets done. See my answer below.

Update 1:

The issue is the monkey patching of eventlet. Monkey patching is pretty magic to me, so I don't fully understand why exactly. I can get the celery task to run if I don't monkey patch eventlet. However, if I don't monkey patch eventlet, I cannot use a SocketIO instance in another process, as I cannot use redis as the SocketIO message queue without monkey patching. Trying gevent and still running into issues but will update with results.

Also note, I had to change the Celery object instantiation to Celery("backend") rather than Celery(app.import_name) or Celery(app.name) or Celery(__name__) to get the non-monkey patched task to run. Because I am not using anything from the app context in my task, I actually don't even need the make_celery.py module, and can just instantiate it directly within backend.py.

I also tried different databases within redis, thought it might be a conflict.

I also moved to remote debugging of the celery task through telnet as described here. Again, when NOT monkey patching, the task runs, the external socketio object exists, though it cannot communicate to the main server to emit. When I DO monkey patch, the task won't even run.

When using gevent and monkey patching, the application doesn't even start up.

The Goal:

Run a real-time web application that pushes data created in parallel process to the client via Socket.IO. I am using Flask and Flask-SocketIO.

What I've Tried:

I originally was just emitting from an external process as described in the docs (see my original minimum working example, here). However, this proved to be buggy. Specifically, it worked flawlessly when the data streaming object was instantiated and the external process started within the if __name__ == "__main__: block, but failed when it was instantiated and started on demand from a Socket.IO event. Much research led me to this eventlet issue, which is still open and suggests eventlet and multiprocessing do not play well together. I then tried gevent and it worked for a while but was still buggy when left running for long periods of time (e.g. 12 hours).

This answer led me to try Celery in my app and I have been struggling ever since. Specifically, my issues is that the task status shows pending for a while (I'm guessing the defualt timeout amount of time) and then shows failure. When running the worker with debug logging level, the error shows Received unregistered task of type '__main__.stream_data'. I've tried every way I can think of to start the worker and register the task. I am confused because my Celery instance is defined in the same scope as the task definition, and, like the countless tutorials and examples I've found online, I start the worker with celery -A backend.cel worker -l DEBUG to tell it to queue from the celery instance within the backend.py module (at least that's what I think that command is doing).

My Current Project State:

.                                                                                                                                                                                                                 
├── backend.py                                                                                                                                                                                                     
├── static                                                                                                                                                                                                        
│   └── js                                                                                                                                                                                                        
│       ├── main.js                                                                                                                                                                                               
│       ├── socket.io.min.js                                                                                                                                                                                      
│       └── socket.io.min.js.map                                                                                                                                                                                  
└── templates                                                                                                                                                                                                         
    └── index.html

backend.py

import eventlet
eventlet.monkey_patch()
# ^^^ COMMENT/UNCOMMENT to get the task to RUN/NOT RUN

from random import randrange
import time

from redis import Redis

from flask import Flask, render_template, request
from flask_socketio import SocketIO
from celery import Celery
from celery.contrib import rdb


def message_queue(db):
    # thought it might be conflicting redis databases so I allowed choice
    # this was not the issue.
    return f"redis://localhost:6379/{db}"

app = Flask(__name__)
socketio = SocketIO(app, message_queue=message_queue(0))

cel = Celery("backend", broker=message_queue(0), backend=message_queue(0))

@app.route('/')
def index():
    return render_template("index.html")

@socketio.on("start_data_stream")
def start_data_stream():
    socketio.emit("new_data", {"value" :  666}) # <<< sanity check, socket server is working here
    stream_data.delay(request.sid)

@cel.task()
def stream_data(sid):

    data_socketio = SocketIO(message_queue=message_queue(0))
    i = 1

    while i <= 100:
        value = randrange(0, 10000, 1) / 100
        data_socketio.emit("new_data", {"value" :  value})
        i  = 1
        time.sleep(0.01)
    
    # rdb.set_trace() # <<<< comment/uncomment as needed for debugging, see: https://docs.celeryq.dev/en/latest/userguide/debugging.html

    return i, value


if __name__ == "__main__":

    r = Redis()
    r.flushall()

    if r.ping():
        pass
    else:
        raise Exception("You need redis: https://redis.io/docs/getting-started/installation/. Check that redis-server.service is running!")

    ip = "192.168.1.8" # insert LAN address here
    port = 8080

    socketio.run(app, host=ip, port=port, use_reloader=False, debug=True)

index.html

<!DOCTYPE html>
<html>
<head>

  <title>Minimal Example</title>
  <script src="{{ url_for('static', filename='js/socket.io.min.js') }}"></script>

</head>
<body>

    <button id="start" onclick="button_handler()">Start Stream</button>
    <span id="data"></span>

    <script type="text/javascript" src="{{ url_for('static', filename='js/main.js') }}"></script>

</body>
</html>

main.js

var socket =  io(location.origin);
var span = document.getElementById("data");

function button_handler() {
    socket.emit("start_data_stream");
}

socket.on("new_data", function(data) {
    span.innerHTML = data.value;
});

dependencies

Package          Version
---------------- -------
amqp             5.1.1
async-timeout    4.0.2
bidict           0.22.0
billiard         3.6.4.0
celery           5.2.7
click            8.1.3
click-didyoumean 0.3.0
click-plugins    1.1.1
click-repl       0.2.0
Deprecated       1.2.13
dnspython        2.2.1
eventlet         0.33.2
Flask            2.2.2
Flask-SocketIO   5.3.2
gevent           22.10.2
gevent-websocket 0.10.1
greenlet         2.0.1
itsdangerous     2.1.2
Jinja2           3.1.2
kombu            5.2.4
MarkupSafe       2.1.1
packaging        22.0
pip              22.3.1
prompt-toolkit   3.0.36
python-engineio  4.3.4
python-socketio  5.7.2
pytz             2022.6
redis            4.4.0
setuptools       58.1.0
six              1.16.0
vine             5.0.0
wcwidth          0.2.5
Werkzeug         2.2.2
wrapt            1.14.1
zope.event       4.6
zope.interface   5.5.2

Questions:

Is this still an issue with eventlet? This answer leads me to believe that Celery is the workaround to the eventlet issue and suggests Celery doesn't even need eventlet to work. However, eventlet seems entrenched in my source, as nothing works on the redis side if I do not monkey patch. Also, the docs suggest that Flask-SocketIO automatically looks for eventlet, so just by instantiating the external SocketIO server in the celery task would bring in eventlet, correct? Is there something else I am doing wrong? Perhaps there are better ways to debug the worker and task?

Any help would be greatly appreciated, thanks!

CodePudding user response:

The Flask-SocketIO documentation led me to beleive that I needed to patch the entire standard library with:

import eventlet
eventlet.monkey_patch()

However, after reading about eventlet monkey patching here and here, I discovered this is not the case. For redis and flask_socketio I only need to patch the socket library as follows:

import eventlet
eventlet.monkey_patch(all=False, socket=True)

In addition, celery needs to be specifically imported with a patched version as well with:

celery = eventlet.import_patched("celery")

This gives the following full code for backend.py:

import eventlet
eventlet.monkey_patch(all=False, socket=True)

from random import randrange
import time

import redis
celery = eventlet.import_patched("celery")

from flask import Flask, render_template, request
from flask_socketio import SocketIO


message_queue = "redis://localhost:6379/0"

app = Flask(__name__)
socketio = SocketIO(app, message_queue=message_queue)

cel = celery.Celery("backend", broker=message_queue, backend=message_queue)

@app.route('/')
def index():
    return render_template("index.html")

@socketio.on("start_data_stream")
def start_data_stream():
    stream_data.delay(request.sid, message_queue)

@cel.task()
def stream_data(sid, message_queue):

    data_socketio = SocketIO(message_queue=message_queue)
    i = 1

    while i <= 100:
        value = randrange(0, 10000, 1) / 100
        data_socketio.emit("new_data", {"value" :  value})
        i  = 1
        time.sleep(0.01)

    return i, value


if __name__ == "__main__":

    r = redis.Redis()
    r.flushall() # clear the old, abandoned messages from the queue

    if r.ping():
        pass
    else:
        raise Exception("You need redis: https://redis.io/docs/getting-started/installation/. Check that redis-server.service is running!")

    ip = "192.168.1.8" # insert LAN address here
    port = 8080

    socketio.run(app, host=ip, port=port, use_reloader=False, debug=True)

Package:

Here's the whole working example as a package: https://github.com/jacoblapenna/Eventlet_Flask-SocketIO_Celery.git

NOTE:

This solution likely is also a workaround for the eventlet and multiprocessing issue (discussed here and here), and emitting from an external process wouldn't even need celery. However, I also plan to distribute other control tasks to other devices on the same LAN, and celery should be perfect for this.

  • Related