I have an application structured as follows.
- api
-api
settings.py
celery.py
-core
tasks.py
-scripts
cgm.py
On running the following command I can see my task get loaded into the database however it does not actually run and I'm trying to understand why.
celery -A api beat -l debug -S django_celery_beat.schedulers.DatabaseScheduler
Here is my code.
settings.py (relevant parts)
INSTALLED_APPS = (
...,
'django_celery_beat',
)
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
celery.py
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE',
'api.settings')
app = Celery('api')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
tasks.py
from django_celery_beat.models import PeriodicTask, IntervalSchedule
schedule, created = IntervalSchedule.objects.get_or_create(
every=10,
period=IntervalSchedule.SECONDS,
)
PeriodicTask.objects.get_or_create(
interval=schedule,
name='Import Dexcom Data',
task='core.scripts.cgm.load_dexcom',
)
cgm.py
from monitor.models import GlucoseMonitor
def load_dexcom():
from core.models import User
user = User.objects.get(username='xxx')
from pydexcom import Dexcom
dexcom = Dexcom("xxx", "xxx", ous=True) # add ous=True if outside of US
bg = dexcom.get_current_glucose_reading()
data = GlucoseMonitor.objects.create(
user = user,
source = 1,
blood_glucose = bg.mmol_l,
trend = bg.trend,
created = bg.time
)
data.save()
I can run the load_dexcom() manually and it works. My guess is I'm not dot-walking the task properly and it's not finding it but it's not showing any errors in the code. When I run the celery command I can see it load the record but doesn't seem to do anything else.
edit -
Looks like I was missing the following worker command which I've then run
celery -A api worker -l DEBUG
However the output is clearly showing it can't find the script.
The full contents of the message body was:
'[[], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (77b)
Traceback (most recent call last):
File "/home/robin/miniconda3/envs/api/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 581, in on_task_received
strategy = strategies[type_]
KeyError: 'core.scripts.cgm.load_dexcom'
I've tried the following iterations all give key error
load_dexcom
scripts.cgm.load_dexcom
api.core.scripts.cgm.load_dexcom
CodePudding user response:
I'm not sure if this will help for certain as I'm combining some bits from various things I've got working here, but thinking if you modify tasks.py to something like the following it may help. I think you need to decorate the function you are running, but I'm not using Django for this bit so not 100% on that.
from django_celery_beat.models import PeriodicTask, IntervalSchedule
from core.scripts.cgm import load_dexcom
schedule, created = IntervalSchedule.objects.get_or_create(
every=10,
period=IntervalSchedule.SECONDS,
)
PeriodicTask.objects.get_or_create(
interval=schedule,
name='Import Dexcom Data',
task=run_load_dexcom(),
)
@app.task
def run_load_dexcom():
load_dexcom() # and doing the appropriate import at top of file