Home > Software engineering >  How to run parallel tasking with celery django?
How to run parallel tasking with celery django?

Time:08-01

I am looking to run tasks in parallel with django celery.

Let's say the following task:

@shared_task(bind=True)
def loop_task(self):
    for i in range(10):
        time.sleep(1)
        print(i)
    return "done"

Each time a view is loaded then this task must be executed :

def view(request):
    loop_task.delay()

My problem is that I want to run this task multiple times without a queue system in parallel mode. Each time a user goes to a view, there should be no queue to wait for a previous task to finish

Here is the celery command I use :

celery -A toolbox.celery worker --pool=solo -l info -n my_worker1


-------------- celery@my_worker1 v5.2.7 (dawn-chorus)
--- ***** -----
-- ******* ---- Windows-10-10.0.22000-SP0 2022-08-01 10:22:52
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         toolbox:0x1fefe7286a0
- ** ---------- .> transport:   redis://127.0.0.1:6379//
- ** ---------- .> results:
- *** --- * --- .> concurrency: 8 (solo)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery

I have already tried the solutions found here but none of them seem to do what I ask StackOverflow : Executing two tasks at the same time with Celery

I should have the following output:

0,1,2,...,9

If two users load the same page at the same time then we should have the previous output appearing twice

Result :

0,0,1,1,2,2,...,9,9

CodePudding user response:

Maybe you have changed the Django timezone setting.

The database scheduler won’t reset when timezone related settings change, so you must do this manually:

$ python manage.py shell
>>> from django_celery_beat.models import PeriodicTask
>>> PeriodicTask.objects.update(last_run_at=None)

CodePudding user response:

I think it's very simple to solve, but you need to test it.

Basically, you need to run task in async mode - for example, when you are trying to run task that send mass sms to multiple users, you do it in this way:

send_mass_sms.apply_async(
    [
        phone_numbers,
        instance.body,
        instance.id,
    ],
    eta=instance.when,
)

Your code needs to be fixed this way:

def view(request):
    loop_task.apply_async()

If you need to update data on website, you can store data in models and call ajax multiple times or implement logic via websockets, but this is topic for another question :)

  • Related