Home > Net >  Reuse of Celery configuration values for Heroku and local Flask
Reuse of Celery configuration values for Heroku and local Flask

Time:08-13

I'm running a Flask app that runs several Celery tasks (with Redis as the backend) and sometimes caches API calls with Flask-Caching. It will run on Heroku, although at the moment I'm running it locally. I'm trying to figure out if there's a way to reuse my various config variables for Redis access. Mainly in case Heroku changes the credentials, moves Redis to another server, etc. Currently I'm reusing the same Redis credentials in several ways.

From my .env file:

CACHE_REDIS_URL = "redis://127.0.0.1:6379/1"
REDBEAT_REDIS_URL = "redis://127.0.0.1:6379/1"
CELERY_BROKER_URL = "redis://127.0.0.1:6379/1"
RESULT_BACKEND = "redis://127.0.0.1:6379/1"

From my config.py file:

import os
from pathlib import Path

basedir = os.path.abspath(os.path.dirname(__file__))

class Config(object):
    # non redis values are above and below these items
    CELERY_BROKER_URL = os.environ.get("CELERY_BROKER_URL", "redis://127.0.0.1:6379/0")
    RESULT_BACKEND = os.environ.get("RESULT_BACKEND", "redis://127.0.0.1:6379/0")
    CELERY_RESULT_BACKEND = RESULT_BACKEND # because of the deprecated value
    CACHE_REDIS_URL = os.environ.get("CACHE_REDIS_URL", "redis://127.0.0.1:6379/0")
    REDBEAT_REDIS_URL = os.environ.get("REDBEAT_REDIS_URL", "redis://127.0.0.1:6379/0")

In extensions.py:

from celery import Celery

from src.cache import cache

celery = Celery()

def register_extensions(app, worker=False):

    cache.init_app(app)

    # load celery config
    celery.config_from_object(app.config)

    if not worker:
        # register celery irrelevant extensions
        pass

In my __init__.py:

import os
from flask import Flask, jsonify, request, current_app

from src.extensions import register_extensions

from config import Config

def create_worker_app(config_class=Config):
    """Minimal App without routes for celery worker."""
    app = Flask(__name__)
    app.config.from_object(config_class)

    register_extensions(app, worker=True)

    return app

from my worker.py file:

from celery import Celery
from celery.schedules import schedule

from redbeat import RedBeatSchedulerEntry as Entry

from . import create_worker_app

# load several tasks from other files here

def create_celery(app):
    celery = Celery(
        app.import_name,
        backend=app.config["RESULT_BACKEND"],
        broker=app.config["CELERY_BROKER_URL"],
        redbeat_redis_url = app.config["REDBEAT_REDIS_URL"],
    )
    celery.conf.update(app.config)
    TaskBase = celery.Task

    class ContextTask(TaskBase):
        abstract = True

        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)

    celery.Task = ContextTask
    return celery


flask_app = create_worker_app()
celery = create_celery(flask_app)

# call the tasks, passing app=celery as a parameter

This all works fine, locally (I've tried to remove code that isn't relevant to the Celery configuration). I haven't finished deploying to Heroku yet because I remembered that when I install Heroku Data for Redis, it creates a REDIS_URL setting that I'd like to use.

I've been trying to change my config.py values to use REDIS_URL instead of the other things they use, but every time I try to run my celery tasks the connection fails unless I have distinct env values as shown in my config.py above.

What I'd like to have in config.py would be this:

import os
from pathlib import Path

basedir = os.path.abspath(os.path.dirname(__file__))

class Config(object):

    REDIS_URL = os.environ.get("REDIS_URL", "redis://127.0.0.1:6379/0")
    CELERY_BROKER_URL = os.environ.get("CELERY_BROKER_URL", REDIS_URL)
    RESULT_BACKEND = os.environ.get("RESULT_BACKEND", REDIS_URL)
    CELERY_RESULT_BACKEND = RESULT_BACKEND
    CACHE_REDIS_URL = os.environ.get("CACHE_REDIS_URL", REDIS_URL)
    REDBEAT_REDIS_URL = os.environ.get("REDBEAT_REDIS_URL", REDIS_URL)

When I try this, and when I remove all of the values from .env except for REDIS_URL and then try to run one of my Celery tasks, the task never runs. The Celery worker appears to run correctly, and the Flask-Caching requests run correctly (these run directly within the application rather than using the worker). It never appears as a received task in the worker's debug logs, and eventually the server request times out.

Is there anything I can do to reuse Redis_URL with Celery in this way? If I can't, is there anything Heroku does expect me to do to maintain the credentials/server path/etc for where it is serving Redis for Celery, when I'm using the same instance of Redis for several purposes like this?

CodePudding user response:

By running my Celery worker with the -E flag, as in celery -A src.worker:celery worker -S redbeat.RedBeatScheduler --loglevel=INFO -E, I was able to figure out that my error was happening because Flask's instance of Celery, in gunicorn, was not able to access the config values for Celery that the worker was using.

What I've done to try to resolve this appears to have worked.

In extensions.py, instead of configuring Celery, I've done this, removing all other mentions of Celery:

from celery import Celery
celery = Celery('scraper') # a temporary name

Then, on the same level, I created a celery.py:

from celery import Celery
from flask import Flask

from src import extensions

def configure_celery(app):
    TaskBase = extensions.celery.Task

    class ContextTask(TaskBase):
        abstract = True

        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)

    extensions.celery.conf.update(
        broker_url=app.config['CELERY_BROKER_URL'],
        result_backend=app.config['RESULT_BACKEND'],
        redbeat_redis_url = app.config["REDBEAT_REDIS_URL"]
    )

    extensions.celery.Task = ContextTask
    return extensions.celery

In worker.py, I'm doing:

from celery import Celery
from celery.schedules import schedule
from src.celery import configure_celery

flask_app = create_worker_app()
celery = configure_celery(flask_app)

I'm doing a similar thing in app.py:

from src.celery import configure_celery

app = create_app()
configure_celery(app)

As far as I can tell, this doesn't change how the worker behaves at all, but it allows me to access the tasks, via blueprint endpoints, in the browser.

I found this technique in this article and its accompanying GitHub repo

  • Related