I am attempting to run celery on it's own container from my Flask app. Right now I am just setting up a simple email app. The container CMD is
"["celery", "worker", "--loglevel=info"]"
The message gets sent to the redis broker and celery picks it up, but celery gives me the error.
"Received unregistered task of type 'flask_project.views.send_async_email'. The message has been ignored and discarded."
I am setting the include in the celery config on my flask app. I have restarted and rebuilt my containers and still the same issue.
from flask import Blueprint, current_app
from flask_mail import Mail
from os import getenv
from celery import Celery
from .support_func import decorator_require_api
views = Blueprint('views', __name__)
celery = Celery(views.name,
broker='redis://redis:6379/0',
include=["views.tasks"])
@celery.task
def send_async_email(email_data):
mail = Mail()
mail.send(email_data)
@views.route('/')
def home():
with current_app.app_context():
email_data = {'sender': getenv('MAIL_USERNAME'), 'recipients': ['[email protected]'],
'message': "This is a test email"}
send_async_email.delay(email_data)
return "Message sent!"
Compose:
---
version: "3.9"
services:
flask:
build:
context: ./Docker/flask
container_name: flask
volumes:
- ./app/:/app
restart: unless-stopped
stdin_open: true
#entrypoint: /bin/bash
networks:
- api
nginx:
image: nginx:latest
container_name: nginx
depends_on:
- flask
#entrypoint: /bin/bash
volumes:
- ./nginx_config:/etc/nginx/conf.d
- ./app/:/app
ports:
- "5000:443"
networks:
- api
celery:
build:
context: ./Docker/celery
container_name: celery
depends_on:
- redis
restart: unless-stopped
stdin_open: true
networks:
- api
redis:
image: redis:latest
container_name: redis
depends_on:
- flask
#entrypoint: /bin/bash
networks:
- api
networks:
api:
driver: bridge
-----------------
DockerFile:
FROM python:3.9.7-slim-buster
WORKDIR /app
RUN apt-get update && apt-get install -y \
build-essential # python-dev libssl-dev openssl
COPY ./ .
RUN pip3 install -r requirements.txt
ENV CELERY_BROKER_URL=redis://redis:6379/0
CMD ["celery", "worker", "--loglevel=info"]
CodePudding user response:
You need to pass the celery app to the worker with --app
or -A
flag (see my answer/example here).
I would recommend to refactor a bit and extract this snippet:
celery = Celery(views.name,
broker='redis://redis:6379/0',
include=["views.tasks"])
to external file, such as celery_app.py
and then import it for your flask app and use it for the worker:
["celery", "--app", "your_module.celery_app:celery", "worker", "--loglevel=info"]
You should see the registered tasks within the worker's startup logs (when you see the big C (Celery) logo..
CodePudding user response:
I finally figured it out. I used https://blog.miguelgrinberg.com/post/celery-and-the-flask-application-factory-pattern as a reference. Now I can register new blueprints without touching the celery config. It is a work in progress, but now the containers are all up and running.
.
├── Docker
│ ├── celery
│ │ ├── Dockerfile
│ │ └── requirements.txt
│ └── flask
│ ├── Dockerfile
│ └── requirements.txt
├── app
│ ├── flask_project
│ │ ├── __init__.py
│ │ ├── celery_app.py
│ │ └── views.py
├── docker-compose.yml
Compose:
--------------------------------------------------------------------------------
---
version: "3.9"
services:
flask:
build:
context: ./Docker/flask
container_name: flask
volumes:
- ./app/:/app
restart: unless-stopped
stdin_open: true
networks:
- api
nginx:
image: nginx:latest
container_name: nginx
depends_on:
- flask
#entrypoint: /bin/bash
volumes:
- ./nginx_config:/etc/nginx/conf.d
- ./app/:/app
ports:
- "5000:443"
networks:
- api
celery:
build:
context: ./Docker/celery
container_name: celery
depends_on:
- redis
volumes:
- ./app/:/app
restart: unless-stopped
stdin_open: true
networks:
- api
redis:
image: redis:latest
container_name: redis
depends_on:
- flask
#entrypoint: /bin/bash
networks:
- api
networks:
api:
driver: bridge
celery_app.py:
--------------------------------------------------------------------------------
from . import celery, create_app
app = create_app()
app.app_context().push()
__init__.py:
--------------------------------------------------------------------------------
from celery import Celery
celery = Celery(__name__, broker=getenv('CELERY_BROKER_URL'))
def create_app():
app = Flask(__name__)
# Celery stuff
celery.conf.update(app.config)
# Register Blueprints
from .views import views
app.register_blueprint(views, url_prefix='/')
return app
views.py:
--------------------------------------------------------------------------------
from flask import Blueprint, current_app
from flask_mail import Message, Mail
from os import getenv
from . import celery
views = Blueprint('views', __name__)
@celery.task
def send_async_email(email_data):
msg = Message(email_data['subject'],
sender=email_data['sender'],
recipients=email_data['recipients'],
)
msg.body = email_data['message']
mail = Mail()
mail.send(msg)
@views.route('/')
def home():
with current_app.app_context():
email_data = {'sender': getenv('MAIL_USERNAME'),
'recipients': ['[email protected]'],
'subject': 'testing123',
'message': "testing123"
}
msg = Message(email_data['subject'],
sender=email_data['sender'],
recipients=email_data['recipients'],
)
msg.body = email_data['message']
send_async_email.delay(email_data)
return "Message sent!"