I have a flask app with celery configured inside.
This is the configuration flow:
app.common.globals.py
:
from flask import current_app as app
from celery import Celery
celery = Celery()
__init__.py
from app.common.globals import celery
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'], backend=app.config['CELERY_BACKEND_URL'])
celery.conf.update(app.config)
def create_app(config_class=Config):
app: Flask = Flask(__name__)
app.config.from_object(config_class)
logging.basicConfig(level=logging.INFO)
debug = app.config['DEBUG']
if debug:
app.logger.setLevel(logging.INFO)
with app.app_context():
from app.common.globals import celery
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'], backend=app.config['CELERY_BACKEND_URL'])
celery.conf.update(app.config)
return app
config.py
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0'
CELERY_BACKEND_URL = 'redis://127.0.0.1:6379/0'
Then where the app is running in wsgi.py
from app import create_app
app = create_app()
debug = app.config['DEBUG']
if __name__ == "__main__":
app.run(host='127.0.0.1', port=5000, debug=debug, threaded=True)
I have a redis server up and running where I can connect to it easily:
redis-cli -u redis://localhost:6379/0
localhost:6379>
The problem is apparently celery is not able to connect to redis server so it throws the warning below and tries to connect to the default amqp://guest:**@127.0.0.1:5672//:
and fails.
WARNING:kombu.connection:No hostname was supplied. Reverting to default 'localhost'
File "/User/omar/venv/lib/python3.8/site-packages/kombu/connection.py", line 450, in _reraise_as_library_errors
raise ConnectionError(str(exc)) from exc
kombu.exceptions.OperationalError: [Errno 61] Connection refused
When I print the celery
instance, it shows it can read the broker url.
celery = Celery(app, broker=app.config['CELERY_BROKER_URL'], backend=app.config['CELERY_BACKEND_URL'])
celery.conf.update(app.config)
print(celery.conf)
<RESULT>
Settings(Settings({'broker_url': 'redis://127.0.0.1:6379/0',
'result_backend': 'redis://localhost:6379', 'deprecated_settings':
set()}, {}, {'accept_content': ['json'], 'result_accept_content':
None, 'enable_utc': True, 'imports': (), 'include': (), 'timezone':
None, 'beat_max_loop_interval': 0, 'beat_schedule': {},
'beat_scheduler': 'celery.beat:PersistentScheduler',
'beat_schedule_filename': 'celerybeat-schedule', 'beat_sync_every':
0, 'broker_url': 'redis://127.0.0.1:6379/0', 'broker_read_url': None,
'broker_write_url': None, 'broker_transport': None,
'broker_transport_options': {}, 'broker_connection_timeout': 4,
'broker_connection_retry': True, 'broker_connection_max_retries':
100, 'broker_failover_strategy': None, 'broker_heartbeat': 120,
'broker_heartbeat_checkrate': 3.0, 'broker_login_method': None,
'broker_pool_limit': 10, 'broker_use_ssl': False, 'broker_host':
None, 'broker_port': None, 'broker_user': None, 'broker_password':
None, 'broker_vhost': None, 'cache_backend': None,
'cache_backend_options': {}, 'cassandra_entry_ttl': None,
'cassandra_keyspace': None, 'cassandra_port': None,
'cassandra_read_consistency': None, 'cassandra_servers': None, 'cassandra_table': None, 'cassandra_write_consistency': None, 'cassandra_auth_provider': None, 'cassandra_auth_kwargs': None, 'cassandra_options': {}, 's3_access_key_id': None, 's3_secret_access_key': None, 's3_bucket': None, 's3_base_path': None, 's3_endpoint_url': None, 's3_region': None, 'azureblockblob_container_name': 'celery', 'azureblockblob_retry_initial_backoff_sec': 2, 'azureblockblob_retry_increment_base': 2, 'azureblockblob_retry_max_attempts': 3, 'azureblockblob_base_path': '', 'azureblockblob_connection_timeout': 20, 'azureblockblob_read_timeout': 120, 'control_queue_ttl': 300.0, 'control_queue_expires': 10.0, 'control_exchange': 'celery', 'couchbase_backend_settings': None, 'arangodb_backend_settings': None, 'mongodb_backend_settings': None, 'cosmosdbsql_database_name': 'celerydb', 'cosmosdbsql_collection_name': 'celerycol', 'cosmosdbsql_consistency_level': 'Session', 'cosmosdbsql_max_retry_attempts': 9, 'cosmosdbsql_max_retry_wait_time': 30, 'event_queue_expires': 60.0, 'event_queue_ttl': 5.0, 'event_queue_prefix': 'celeryev', 'event_serializer': 'json', 'event_exchange': 'celeryev', 'redis_backend_use_ssl': None, 'redis_db': None, 'redis_host': None, 'redis_max_connections': None, 'redis_username': None, 'redis_password': None, 'redis_port': None, 'redis_socket_timeout': 120.0, 'redis_socket_connect_timeout': None, 'redis_retry_on_timeout': False, 'redis_socket_keepalive': False, 'result_backend': 'redis://localhost:6379', 'result_cache_max': -1, 'result_compression': None, 'result_exchange': 'celeryresults', 'result_exchange_type': 'direct', 'result_expires': datetime.timedelta(days=1), 'result_persistent': None, 'result_extended': False, 'result_serializer': 'json', 'result_backend_transport_options': {}, 'result_chord_retry_interval': 1.0, 'result_chord_join_timeout': 3.0, 'result_backend_max_sleep_between_retries_ms': 10000, 'result_backend_max_retries': inf, 'result_backend_base_sleep_between_retries_ms': 10, 'result_backend_always_retry': False, 'elasticsearch_retry_on_timeout': None, 'elasticsearch_max_retries': None, 'elasticsearch_timeout': None, 'elasticsearch_save_meta_as_text': True, 'security_certificate': None, 'security_cert_store': None, 'security_key': None, 'security_digest': 'sha256', 'database_url': None, 'database_engine_options': None, 'database_short_lived_sessions': False, 'database_table_schemas': None, 'database_table_names': None, 'task_acks_late': False, 'task_acks_on_failure_or_timeout': True, 'task_always_eager': False, 'task_annotations': None, 'task_compression': None, 'task_create_missing_queues': True, 'task_inherit_parent_priority': False, 'task_default_delivery_mode': 2, 'task_default_queue': 'celery', 'task_default_exchange': None, 'task_default_exchange_type': 'direct', 'task_default_routing_key': None, 'task_default_rate_limit': None, 'task_default_priority': None, 'task_eager_propagates': False, 'task_ignore_result': False, 'task_store_eager_result': False, 'task_protocol': 2, 'task_publish_retry': True, 'task_publish_retry_policy': {'max_retries': 3, 'interval_start': 0, 'interval_max': 1, 'interval_step': 0.2}, 'task_queues': None, 'task_queue_max_priority': None, 'task_reject_on_worker_lost': None, 'task_remote_tracebacks': False, 'task_routes': None, 'task_send_sent_event': False, 'task_serializer': 'json', 'task_soft_time_limit': None, 'task_time_limit': None, 'task_store_errors_even_if_ignored': False, 'task_track_started': False, 'worker_agent': None, 'worker_autoscaler': 'celery.worker.autoscale:Autoscaler', 'worker_cancel_long_running_tasks_on_connection_loss': False, 'worker_concurrency': None, 'worker_consumer': 'celery.worker.consumer:Consumer', 'worker_direct': False, 'worker_disable_rate_limits': False, 'worker_deduplicate_successful_tasks': False, 'worker_enable_remote_control': True, 'worker_hijack_root_logger': True, 'worker_log_color': None, 'worker_log_format': '[%(asctime)s: %(levelname)s/%(processName)s] %(message)s', 'worker_lost_wait': 10.0, 'worker_max_memory_per_child': None, 'worker_max_tasks_per_child': None, 'worker_pool': 'prefork', 'worker_pool_putlocks': True, 'worker_pool_restarts': False, 'worker_proc_alive_timeout': 4.0, 'worker_prefetch_multiplier': 4, 'worker_redirect_stdouts': True, 'worker_redirect_stdouts_level': 'WARNING', 'worker_send_task_events': False, 'worker_state_db': None, 'worker_task_log_format': '[%(asctime)s: %(levelname)s/%(processName)s] %(task_name)s[%(task_id)s]: %(message)s', 'worker_timer': None, 'worker_timer_precision': 1.0, 'deprecated_settings': None}))
Any ideas what I am doing wrong?
CodePudding user response:
In your init.py file, you've created celery object at module level and also in the create_app(). You could use the pattern of flask extensions to add the flask app to celery in your factory pattern like this: In custom.celery.py file
from celery import Celery import flask
class FlaskCelery(Celery):
"""Extend celery task to use flask app."""
def __init__(self, *args, **kwargs):
"""Def initialization fnx."""
super(FlaskCelery, self).__init__(*args, **kwargs)
self.patch_task()
if "app" in kwargs:
self.init_app(kwargs["app"])
def patch_task(self):
"""Extend celery task."""
TaskBase = self.Task
_celery = self
class ContextTask(TaskBase):
"""Extend taskbase."""
abstract = True
def __call__(self, *args, **kwargs):
"""Class caller definition."""
if flask.has_app_context():
return TaskBase.__call__(self, *args, **kwargs)
else:
with _celery.app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
self.Task = ContextTask
def init_app(self, app):
"""Initialize celery with app."""
self.app = app
In extensions.py file:
celery = FlaskCelery(broker='redis://127.0.0.1:6379/0', backend='redis://127.0.0.1:6379/0')
then in init.py file:
from extensions import celery
def create_app():
app: Flask = Flask(__name__)
app.config.from_object(config_class)
logging.basicConfig(level=logging.INFO)
debug = app.config['DEBUG']
if debug:
app.logger.setLevel(logging.INFO)
celery.init_app(app)
return app