Home > Software engineering >  Celery failing to connect to redis in Flask
Celery failing to connect to redis in Flask

Time:06-24

I have a flask app with celery configured inside.

This is the configuration flow:

app.common.globals.py:

from flask import current_app as app
from celery import Celery

celery = Celery()

__init__.py

from app.common.globals import celery
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'], backend=app.config['CELERY_BACKEND_URL'])
celery.conf.update(app.config)


def create_app(config_class=Config):
    app: Flask = Flask(__name__)
    app.config.from_object(config_class)
    logging.basicConfig(level=logging.INFO)
    debug = app.config['DEBUG']
    if debug:
        app.logger.setLevel(logging.INFO)

    with app.app_context():
        from app.common.globals import celery

    celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'], backend=app.config['CELERY_BACKEND_URL'])
    celery.conf.update(app.config)

    
    return app

config.py

CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0'
CELERY_BACKEND_URL = 'redis://127.0.0.1:6379/0'

Then where the app is running in wsgi.py


from app import create_app

app = create_app()
debug = app.config['DEBUG']


if __name__ == "__main__":
    app.run(host='127.0.0.1', port=5000, debug=debug, threaded=True)

I have a redis server up and running where I can connect to it easily:

redis-cli -u redis://localhost:6379/0                   
localhost:6379> 

The problem is apparently celery is not able to connect to redis server so it throws the warning below and tries to connect to the default amqp://guest:**@127.0.0.1:5672//: and fails.

WARNING:kombu.connection:No hostname was supplied. Reverting to default 'localhost'
  File "/User/omar/venv/lib/python3.8/site-packages/kombu/connection.py", line 450, in _reraise_as_library_errors
    raise ConnectionError(str(exc)) from exc
kombu.exceptions.OperationalError: [Errno 61] Connection refused

When I print the celery instance, it shows it can read the broker url.

celery = Celery(app, broker=app.config['CELERY_BROKER_URL'], backend=app.config['CELERY_BACKEND_URL'])
celery.conf.update(app.config)
print(celery.conf)

<RESULT>

Settings(Settings({'broker_url': 'redis://127.0.0.1:6379/0', 
'result_backend': 'redis://localhost:6379', 'deprecated_settings': 
set()}, {}, {'accept_content': ['json'], 'result_accept_content': 
None, 'enable_utc': True, 'imports': (), 'include': (), 'timezone': 
None, 'beat_max_loop_interval': 0, 'beat_schedule': {}, 
'beat_scheduler': 'celery.beat:PersistentScheduler', 
'beat_schedule_filename': 'celerybeat-schedule', 'beat_sync_every': 
0, 'broker_url': 'redis://127.0.0.1:6379/0', 'broker_read_url': None,
 'broker_write_url': None, 'broker_transport': None, 
'broker_transport_options': {}, 'broker_connection_timeout': 4, 
'broker_connection_retry': True, 'broker_connection_max_retries': 
100, 'broker_failover_strategy': None, 'broker_heartbeat': 120, 
'broker_heartbeat_checkrate': 3.0, 'broker_login_method': None, 
'broker_pool_limit': 10, 'broker_use_ssl': False, 'broker_host': 
None, 'broker_port': None, 'broker_user': None, 'broker_password': 
None, 'broker_vhost': None, 'cache_backend': None, 
'cache_backend_options': {}, 'cassandra_entry_ttl': None, 
'cassandra_keyspace': None, 'cassandra_port': None, 
'cassandra_read_consistency': None, 'cassandra_servers': None, 'cassandra_table': None, 'cassandra_write_consistency': None, 'cassandra_auth_provider': None, 'cassandra_auth_kwargs': None, 'cassandra_options': {}, 's3_access_key_id': None, 's3_secret_access_key': None, 's3_bucket': None, 's3_base_path': None, 's3_endpoint_url': None, 's3_region': None, 'azureblockblob_container_name': 'celery', 'azureblockblob_retry_initial_backoff_sec': 2, 'azureblockblob_retry_increment_base': 2, 'azureblockblob_retry_max_attempts': 3, 'azureblockblob_base_path': '', 'azureblockblob_connection_timeout': 20, 'azureblockblob_read_timeout': 120, 'control_queue_ttl': 300.0, 'control_queue_expires': 10.0, 'control_exchange': 'celery', 'couchbase_backend_settings': None, 'arangodb_backend_settings': None, 'mongodb_backend_settings': None, 'cosmosdbsql_database_name': 'celerydb', 'cosmosdbsql_collection_name': 'celerycol', 'cosmosdbsql_consistency_level': 'Session', 'cosmosdbsql_max_retry_attempts': 9, 'cosmosdbsql_max_retry_wait_time': 30, 'event_queue_expires': 60.0, 'event_queue_ttl': 5.0, 'event_queue_prefix': 'celeryev', 'event_serializer': 'json', 'event_exchange': 'celeryev', 'redis_backend_use_ssl': None, 'redis_db': None, 'redis_host': None, 'redis_max_connections': None, 'redis_username': None, 'redis_password': None, 'redis_port': None, 'redis_socket_timeout': 120.0, 'redis_socket_connect_timeout': None, 'redis_retry_on_timeout': False, 'redis_socket_keepalive': False, 'result_backend': 'redis://localhost:6379', 'result_cache_max': -1, 'result_compression': None, 'result_exchange': 'celeryresults', 'result_exchange_type': 'direct', 'result_expires': datetime.timedelta(days=1), 'result_persistent': None, 'result_extended': False, 'result_serializer': 'json', 'result_backend_transport_options': {}, 'result_chord_retry_interval': 1.0, 'result_chord_join_timeout': 3.0, 'result_backend_max_sleep_between_retries_ms': 10000, 'result_backend_max_retries': inf, 'result_backend_base_sleep_between_retries_ms': 10, 'result_backend_always_retry': False, 'elasticsearch_retry_on_timeout': None, 'elasticsearch_max_retries': None, 'elasticsearch_timeout': None, 'elasticsearch_save_meta_as_text': True, 'security_certificate': None, 'security_cert_store': None, 'security_key': None, 'security_digest': 'sha256', 'database_url': None, 'database_engine_options': None, 'database_short_lived_sessions': False, 'database_table_schemas': None, 'database_table_names': None, 'task_acks_late': False, 'task_acks_on_failure_or_timeout': True, 'task_always_eager': False, 'task_annotations': None, 'task_compression': None, 'task_create_missing_queues': True, 'task_inherit_parent_priority': False, 'task_default_delivery_mode': 2, 'task_default_queue': 'celery', 'task_default_exchange': None, 'task_default_exchange_type': 'direct', 'task_default_routing_key': None, 'task_default_rate_limit': None, 'task_default_priority': None, 'task_eager_propagates': False, 'task_ignore_result': False, 'task_store_eager_result': False, 'task_protocol': 2, 'task_publish_retry': True, 'task_publish_retry_policy': {'max_retries': 3, 'interval_start': 0, 'interval_max': 1, 'interval_step': 0.2}, 'task_queues': None, 'task_queue_max_priority': None, 'task_reject_on_worker_lost': None, 'task_remote_tracebacks': False, 'task_routes': None, 'task_send_sent_event': False, 'task_serializer': 'json', 'task_soft_time_limit': None, 'task_time_limit': None, 'task_store_errors_even_if_ignored': False, 'task_track_started': False, 'worker_agent': None, 'worker_autoscaler': 'celery.worker.autoscale:Autoscaler', 'worker_cancel_long_running_tasks_on_connection_loss': False, 'worker_concurrency': None, 'worker_consumer': 'celery.worker.consumer:Consumer', 'worker_direct': False, 'worker_disable_rate_limits': False, 'worker_deduplicate_successful_tasks': False, 'worker_enable_remote_control': True, 'worker_hijack_root_logger': True, 'worker_log_color': None, 'worker_log_format': '[%(asctime)s: %(levelname)s/%(processName)s] %(message)s', 'worker_lost_wait': 10.0, 'worker_max_memory_per_child': None, 'worker_max_tasks_per_child': None, 'worker_pool': 'prefork', 'worker_pool_putlocks': True, 'worker_pool_restarts': False, 'worker_proc_alive_timeout': 4.0, 'worker_prefetch_multiplier': 4, 'worker_redirect_stdouts': True, 'worker_redirect_stdouts_level': 'WARNING', 'worker_send_task_events': False, 'worker_state_db': None, 'worker_task_log_format': '[%(asctime)s: %(levelname)s/%(processName)s] %(task_name)s[%(task_id)s]: %(message)s', 'worker_timer': None, 'worker_timer_precision': 1.0, 'deprecated_settings': None}))


Any ideas what I am doing wrong?

CodePudding user response:

In your init.py file, you've created celery object at module level and also in the create_app(). You could use the pattern of flask extensions to add the flask app to celery in your factory pattern like this: In custom.celery.py file

from celery import Celery import flask

class FlaskCelery(Celery):
    """Extend celery task to use flask app."""
    def __init__(self, *args, **kwargs):
        """Def initialization fnx."""
        super(FlaskCelery, self).__init__(*args, **kwargs)
        self.patch_task()

        if "app" in kwargs:
            self.init_app(kwargs["app"])

    def patch_task(self):
        """Extend celery task."""
        TaskBase = self.Task
        _celery = self

        class ContextTask(TaskBase):
            """Extend taskbase."""
            abstract = True

            def __call__(self, *args, **kwargs):
                """Class caller definition."""
                if flask.has_app_context():
                    return TaskBase.__call__(self, *args, **kwargs)
                else:
                    with _celery.app.app_context():
                        return TaskBase.__call__(self, *args, **kwargs)

        self.Task = ContextTask

    def init_app(self, app):
        """Initialize celery with app."""
        self.app = app

In extensions.py file:

celery = FlaskCelery(broker='redis://127.0.0.1:6379/0', backend='redis://127.0.0.1:6379/0')

then in init.py file:

from extensions import celery


def create_app():
    app: Flask = Flask(__name__)
    app.config.from_object(config_class)
    logging.basicConfig(level=logging.INFO)
    debug = app.config['DEBUG']
    if debug:
        app.logger.setLevel(logging.INFO)

    celery.init_app(app)
    return app
  • Related