Home > Net >  celery on a separate container
celery on a separate container

Time:09-01

I am attempting to run celery on it's own container from my Flask app. Right now I am just setting up a simple email app. The container CMD is

"["celery", "worker", "--loglevel=info"]"

The message gets sent to the redis broker and celery picks it up, but celery gives me the error.

"Received unregistered task of type 'flask_project.views.send_async_email'. The message has been ignored and discarded."

I am setting the include in the celery config on my flask app. I have restarted and rebuilt my containers and still the same issue.

from flask import Blueprint, current_app
from flask_mail import Mail
from os import getenv
from celery import Celery

from .support_func import decorator_require_api

views = Blueprint('views', __name__)

celery = Celery(views.name,
                broker='redis://redis:6379/0',
                include=["views.tasks"])

@celery.task
def send_async_email(email_data):
    mail = Mail()
    mail.send(email_data)

@views.route('/')
def home():
    with current_app.app_context():

        email_data = {'sender': getenv('MAIL_USERNAME'), 'recipients': ['[email protected]'],
                      'message': "This is a test email"}

        send_async_email.delay(email_data)
    return "Message sent!"

Compose:

---
version: "3.9"
services:
  flask:
    build:
      context: ./Docker/flask
    container_name: flask
    volumes:
      - ./app/:/app
    restart: unless-stopped
    stdin_open: true
    #entrypoint: /bin/bash
    networks:
      - api
      
  nginx:
    image: nginx:latest
    container_name: nginx
    depends_on:
      - flask
    #entrypoint: /bin/bash
    volumes:
      - ./nginx_config:/etc/nginx/conf.d
      - ./app/:/app
    ports:
      - "5000:443"
    networks:
      - api

  celery:
    build:
      context: ./Docker/celery
    container_name: celery
    depends_on:
      - redis
    restart: unless-stopped
    stdin_open: true
    networks:
      - api

  redis:
    image: redis:latest
    container_name: redis
    depends_on:
      - flask
    #entrypoint: /bin/bash
    networks:
      - api

networks:
  api:
    driver: bridge

  -----------------
DockerFile:

FROM python:3.9.7-slim-buster

WORKDIR /app

RUN apt-get update && apt-get install -y \
    build-essential # python-dev libssl-dev openssl

COPY ./ .

RUN pip3 install -r requirements.txt

ENV CELERY_BROKER_URL=redis://redis:6379/0

CMD ["celery", "worker", "--loglevel=info"]

CodePudding user response:

You need to pass the celery app to the worker with --app or -A flag (see my answer/example here).

I would recommend to refactor a bit and extract this snippet:

celery = Celery(views.name,
                broker='redis://redis:6379/0',
                include=["views.tasks"])

to external file, such as celery_app.py and then import it for your flask app and use it for the worker:

["celery", "--app", "your_module.celery_app:celery", "worker", "--loglevel=info"]

You should see the registered tasks within the worker's startup logs (when you see the big C (Celery) logo..

CodePudding user response:

I finally figured it out. I used https://blog.miguelgrinberg.com/post/celery-and-the-flask-application-factory-pattern as a reference. Now I can register new blueprints without touching the celery config. It is a work in progress, but now the containers are all up and running.

.
├── Docker
│   ├── celery
│   │   ├── Dockerfile
│   │   └── requirements.txt
│   └── flask
│       ├── Dockerfile
│       └── requirements.txt
├── app
│   ├── flask_project
│   │   ├── __init__.py
│   │   ├── celery_app.py
│   │   └── views.py
├── docker-compose.yml



Compose:
--------------------------------------------------------------------------------
---
version: "3.9"
services:
  flask:
    build:
      context: ./Docker/flask
    container_name: flask
    volumes:
      - ./app/:/app
    restart: unless-stopped
    stdin_open: true
    networks:
      - api

  nginx:
    image: nginx:latest
    container_name: nginx
    depends_on:
      - flask
    #entrypoint: /bin/bash
    volumes:
      - ./nginx_config:/etc/nginx/conf.d
      - ./app/:/app
    ports:
      - "5000:443"
    networks:
      - api

  celery:
    build:
      context: ./Docker/celery
    container_name: celery
    depends_on:
      - redis
    volumes:
      - ./app/:/app
    restart: unless-stopped
    stdin_open: true
    networks:
      - api

  redis:
    image: redis:latest
    container_name: redis
    depends_on:
      - flask
    #entrypoint: /bin/bash
    networks:
      - api

networks:
  api:
    driver: bridge

celery_app.py:
--------------------------------------------------------------------------------
from . import celery, create_app

app = create_app()
app.app_context().push()


__init__.py:
--------------------------------------------------------------------------------
from celery import Celery

celery = Celery(__name__, broker=getenv('CELERY_BROKER_URL'))


def create_app():
    app = Flask(__name__)

    # Celery stuff
    celery.conf.update(app.config)


    # Register Blueprints
    from .views import views

    app.register_blueprint(views, url_prefix='/')

    return app

views.py:
--------------------------------------------------------------------------------

from flask import Blueprint, current_app
from flask_mail import Message, Mail
from os import getenv
from . import celery

views = Blueprint('views', __name__)


@celery.task
def send_async_email(email_data):
    msg = Message(email_data['subject'],
                  sender=email_data['sender'],
                  recipients=email_data['recipients'],
                  )
    msg.body = email_data['message']
    mail = Mail()
    mail.send(msg)

@views.route('/')
def home():

    with current_app.app_context():
        email_data = {'sender': getenv('MAIL_USERNAME'),
                      'recipients': ['[email protected]'],
                      'subject': 'testing123',
                      'message': "testing123"
                      }

        msg = Message(email_data['subject'],
                      sender=email_data['sender'],
                      recipients=email_data['recipients'],
                      )
        msg.body = email_data['message']

        send_async_email.delay(email_data)
    return "Message sent!"
  • Related