Home > Software design >  How to preserve Flask app context across Celery and SQLAlchemy
How to preserve Flask app context across Celery and SQLAlchemy

Time:12-24

I'm building trying to learn Flask with a proof of concept Flask app, that takes a JSON payload, and uses SQLAlchemy to write it to a DB. I'm using celery to manage the write tasks.

The app is structured

|-app.py
|-project
  |-__init__.py
  |-celery_utils.py
  |-config.py
  |-users
    |-__init_.py
    |-models.py
    |-tasks.py

app.py builds the flask app and celery instance.

app.py

from project import create_app, ext_celery


app = create_app()
celery = ext_celery.celery


@app.route("/")
def alive():
    return "alive"

/project/__init__.py is the application factory for the flask app. It instantiates the extensions, links everything together, and registers the blueprints.

/project/init.py

import os

from flask import Flask
from flask_celeryext import FlaskCeleryExt
from flask_migrate import Migrate
from flask_sqlalchemy import SQLAlchemy

from project.celery_utils import make_celery
from project.config import config


# instantiate extensions
db = SQLAlchemy()
migrate = Migrate()
ext_celery = FlaskCeleryExt(create_celery_app=make_celery)


def create_app(config_name=None):

    if config_name is None:
        config_name = os.environ.get("FLASK_CONFIG", "development")

    # instantiate the app
    app = Flask(__name__)

    # set config
    app.config.from_object(config[config_name])

    # set up extensions
    db.init_app(app)
    migrate.init_app(app, db)
    ext_celery.init_app(app)

    # register blueprints
    from project.users import users_blueprint
    app.register_blueprint(users_blueprint)

    # shell context for flask cli
    @app.shell_context_processor
    def ctx():
        return {"app": app, "db": db}

    return app

/project/celery_utils.py manages the creation of the celery instances /project/celery_utils.py

from celery import current_app as current_celery_app


def make_celery(app):
    celery = current_celery_app
    celery.config_from_object(app.config, namespace="CELERY")

    return celery

In the users dir, I'm trying to manage the creation of a basic user with celery task management.

'/project/users/init.py` is where I create the blueprints and routes.

/project/users/init.py

from flask import Blueprint, request, jsonify
from .tasks import divide, post_to_db


users_blueprint = Blueprint("users", __name__, url_prefix="/users", template_folder="templates")

from . import models, tasks


@users_blueprint.route('/users', methods=['POST'])
def users():
    request_data = request.get_json()
    task = post_to_db.delay(request_data)
    response = {"id": task.task_id,
                "status": task.status,
                }

    return jsonify(response)


@users_blueprint.route('/responses', methods=['GET'])
def responses():
    request_data = request.get_json()
    result = AsyncResult(id=request_data['id'])
    response = result.get()
    return jsonify(response)

/project/users/models.py is a simple User model - however, it does manage to successfully remain in the context of the flask app if created from the flask app cli.

/project/users/models.py

from project import db


class User(db.Model):
    """model for the user object"""

    __tablename__ = "users"

    id = db.Column(db.Integer, primary_key=True, autoincrement=True)
    username = db.Column(db.String(128), unique=True, nullable=False)
    email = db.Column(db.String(128), unique=True, nullable=False)


    def __init__(self, username, email, *args, **kwargs):
        self.username = username
        self.email = email

Finally, /project/users/tasks.py is where I handle the celery tasks for this dir.

/project/users/tasks.py

from celery import shared_task
from .models import User
from project import db


@shared_task()
def post_to_db(payload):
    print("made it here")
    user = User(**payload)
    db.session.add(user)
    db.session.commit()
    db.session.close()
    return True

The modules work, but as soon as I wire it all up and hit the endpoint with a JSON payload, I get the error message:

RuntimeError:  No application found. Either work inside a view function or push an application context. ...

I have tried to preserve the app context in tasks.py by:

...
from project import db, ext_celery

@ext_celery.shared_task()
def post_to_db(payload):
...
...
from project import db, ext_celery

@ext_celery.task()
def post_to_db(payload):
...

These error with: TypeError: exceptions must derive from BaseException

I've tried pushing the app context

...
from project import db
from app import app

@shared_task()
def post_to_db(payload):
  with app.app_context():
   ...

This also errors with: TypeError: exceptions must derive from BaseException

I've tried importing celery from the app itself

...
from project import db
from app import celery

@celery.task()
def post_to_db(payload):
   ...

This also errors with: TypeError: exceptions must derive from BaseException

Any suggestions gratefully received. There's a final piece of the puzzle I'm missing, and it's very frustrating.

CodePudding user response:

With thanks to snakecharmerb

I had to add ContextTask to the make_celery() function in /project/celery_utils.py

from celery import current_app as current_celery_app


def make_celery(app):
    celery = current_celery_app
    celery.config_from_object(app.config, namespace="CELERY")

    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    celery.Task = ContextTask

    return celery

And then a few tweaks in /project/users/tasks.py

from celery import shared_task
from .models import User
from project import db


@shared_task()
def post_to_db(payload):
    user = User(**payload)
    db.session.add(user)
    db.session.commit()
    db.session.close()
    return True

Now I can see the user in the database, and my message queue is progressing as expected.

  • Related