I'm building trying to learn Flask with a proof of concept Flask app, that takes a JSON payload, and uses SQLAlchemy to write it to a DB. I'm using celery to manage the write tasks.
The app is structured
|-app.py
|-project
|-__init__.py
|-celery_utils.py
|-config.py
|-users
|-__init_.py
|-models.py
|-tasks.py
app.py
builds the flask app and celery instance.
app.py
from project import create_app, ext_celery
app = create_app()
celery = ext_celery.celery
@app.route("/")
def alive():
return "alive"
/project/__init__.py
is the application factory for the flask app. It instantiates the extensions, links everything together, and registers the blueprints.
/project/init.py
import os
from flask import Flask
from flask_celeryext import FlaskCeleryExt
from flask_migrate import Migrate
from flask_sqlalchemy import SQLAlchemy
from project.celery_utils import make_celery
from project.config import config
# instantiate extensions
db = SQLAlchemy()
migrate = Migrate()
ext_celery = FlaskCeleryExt(create_celery_app=make_celery)
def create_app(config_name=None):
if config_name is None:
config_name = os.environ.get("FLASK_CONFIG", "development")
# instantiate the app
app = Flask(__name__)
# set config
app.config.from_object(config[config_name])
# set up extensions
db.init_app(app)
migrate.init_app(app, db)
ext_celery.init_app(app)
# register blueprints
from project.users import users_blueprint
app.register_blueprint(users_blueprint)
# shell context for flask cli
@app.shell_context_processor
def ctx():
return {"app": app, "db": db}
return app
/project/celery_utils.py
manages the creation of the celery instances
/project/celery_utils.py
from celery import current_app as current_celery_app
def make_celery(app):
celery = current_celery_app
celery.config_from_object(app.config, namespace="CELERY")
return celery
In the users dir, I'm trying to manage the creation of a basic user with celery task management.
'/project/users/init.py` is where I create the blueprints and routes.
/project/users/init.py
from flask import Blueprint, request, jsonify
from .tasks import divide, post_to_db
users_blueprint = Blueprint("users", __name__, url_prefix="/users", template_folder="templates")
from . import models, tasks
@users_blueprint.route('/users', methods=['POST'])
def users():
request_data = request.get_json()
task = post_to_db.delay(request_data)
response = {"id": task.task_id,
"status": task.status,
}
return jsonify(response)
@users_blueprint.route('/responses', methods=['GET'])
def responses():
request_data = request.get_json()
result = AsyncResult(id=request_data['id'])
response = result.get()
return jsonify(response)
/project/users/models.py
is a simple User model - however, it does manage to successfully remain in the context of the flask app if created from the flask app cli.
/project/users/models.py
from project import db
class User(db.Model):
"""model for the user object"""
__tablename__ = "users"
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
username = db.Column(db.String(128), unique=True, nullable=False)
email = db.Column(db.String(128), unique=True, nullable=False)
def __init__(self, username, email, *args, **kwargs):
self.username = username
self.email = email
Finally, /project/users/tasks.py
is where I handle the celery tasks for this dir.
/project/users/tasks.py
from celery import shared_task
from .models import User
from project import db
@shared_task()
def post_to_db(payload):
print("made it here")
user = User(**payload)
db.session.add(user)
db.session.commit()
db.session.close()
return True
The modules work, but as soon as I wire it all up and hit the endpoint with a JSON payload, I get the error message:
RuntimeError: No application found. Either work inside a view function or push an application context. ...
I have tried to preserve the app context in tasks.py by:
...
from project import db, ext_celery
@ext_celery.shared_task()
def post_to_db(payload):
...
...
from project import db, ext_celery
@ext_celery.task()
def post_to_db(payload):
...
These error with: TypeError: exceptions must derive from BaseException
I've tried pushing the app context
...
from project import db
from app import app
@shared_task()
def post_to_db(payload):
with app.app_context():
...
This also errors with: TypeError: exceptions must derive from BaseException
I've tried importing celery from the app itself
...
from project import db
from app import celery
@celery.task()
def post_to_db(payload):
...
This also errors with: TypeError: exceptions must derive from BaseException
Any suggestions gratefully received. There's a final piece of the puzzle I'm missing, and it's very frustrating.
CodePudding user response:
With thanks to snakecharmerb
I had to add ContextTask to the make_celery()
function in /project/celery_utils.py
from celery import current_app as current_celery_app
def make_celery(app):
celery = current_celery_app
celery.config_from_object(app.config, namespace="CELERY")
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
return celery
And then a few tweaks in /project/users/tasks.py
from celery import shared_task
from .models import User
from project import db
@shared_task()
def post_to_db(payload):
user = User(**payload)
db.session.add(user)
db.session.commit()
db.session.close()
return True
Now I can see the user in the database, and my message queue is progressing as expected.