Home > Enterprise >  trigger a celery job via django singnals
trigger a celery job via django singnals

Time:08-15

I would like to use Django signals to trigger a celery task like so:

def delete_content(sender, instance, **kwargs):
    task_id = uuid()
    task = delete_libera_contents.apply_async(kwargs={"instance": instance}, task_id=task_id)
    task.wait(timeout=300, interval=2)

But I'm always running into kombu.exceptions.EncodeError: Object of type MusicTracks is not JSON serializable

Now I'm not sure how to tread MusicTracks instance as it's a model class instance. How can I properly pass such instances to my task?

At my tasks.py I have the following:

@app.task(name="Delete Libera Contents", queue='high_priority_tasks')
def delete_libera_contents(instance, **kwargs):

    libera_backend = instance.file.libera_backend
    ...

CodePudding user response:

First off, sorry for making the question a bit confusing, especially for the people that have already written an answer. In my case, the delete_content signal can be trigger from three different models, so it actually looks like this:

@receiver(pre_delete, sender=MusicTracks)
@receiver(pre_delete, sender=Movies)
@receiver(pre_delete, sender=TvShowEpisodes)
def delete_content(sender, instance, **kwargs):
    delete_libera_contents.delay(instance_pk=instance.pk)

So every time one of these models triggers a delete action, this signal will also trigger a celery task to actually delete the stuff in the background (all stored on S3).

As I cannot and should not pass instances around directly as pointed out by @oruchkin, I pass the instance.pk to the celery task which I then have to find in the celery task as I don't know in the celery task what model has triggered the delete action:

@app.task(name="Delete Libera Contents", queue='high_priority_tasks')
def delete_libera_contents(instance_pk, **kwargs):

    if Movies.objects.filter(pk=instance_pk).exists():
        instance = Movies.objects.get(pk=instance_pk)
    elif MusicTracks.objects.filter(pk=instance_pk).exists():
        instance = MusicTracks.objects.get(pk=instance_pk)
    elif TvShowEpisodes.objects.filter(pk=instance_pk).exists():
        instance = TvShowEpisodes.objects.get(pk=instance_pk)
    else:
        raise logger.exception("Task: 'Delete Libera Contents', reports: No instance found (code: JFN4LK) - Warning")

    libera_backend = instance.file.libera_backend

You might ask why do you not simply pass the sender from the signal to the celery task. I also tried this and again, as already pointed out, I cannot pass instances and I fail with:

kombu.exceptions.EncodeError: Object of type ModelBase is not JSON serializable

So it really seems I have to hard obtain the instance using the if-elif-else clauses at the celery task.

CodePudding user response:

Never send instance in celery task, you only should send variables for example instanse primary key and then inside of the celery task via this pk find this instance and then do your logic

your code should be like this:

views.py

def delete_content(sender, **kwargs):
    task_id = uuid()
    task = delete_libera_contents.apply_async(kwargs={"instance_pk": sender.pk}, task_id=task_id)
    task.wait(timeout=300, interval=2)

task.py

@app.task(name="Delete Libera Contents", queue='high_priority_tasks')
def delete_libera_contents(instance_pk, **kwargs):

    instance = Instance.ojbects.get(pk = instance_pk)
    libera_backend = instance.file.libera_backend
    ...

you can find this rule in celery documentation (can't find link), one of reasons imagine situation:

  1. you send your instance to celery tasks (it is delayed for any reason for 5 min)
  2. then your project makes logic with this instance, before your task finished
  3. then celery's task time come and it uses this instance old version, and this instance become corrupted

(this is the reason as I think it is, not from the documentation)

  • Related