I'm new to Celery and want to implement a basic example of a chain containing four tasks. The first task passes its return value to the next task in the chain, and so on. While running Celery worker, I got the following error:
ERROR/ForkPoolWorker-4] Task run-chain[...] raised unexpected: TypeError("unsupported operand type(s) for |: 'chain' and 'AsyncResult'")
Here is my simple chain with the tasks:
from celery import chain, signature
from celery import shared_task
@shared_task(name = 'add')
def add(x, y):
return x y
@shared_task(name = 'substract')
def substract(x, y):
return x - y
@shared_task(name = 'multiply')
def multiply(x, y):
return x * y
@shared_task(name = 'divide')
def divide(x, y):
return x / y
@shared_task(name = 'run-chain')
def run_chain(x, y):
canvas = chain(
add.s(x, y).apply_async(),
substract.s(x, y).apply_async(),
multiply.s(x, y).apply_async(),
divide.s(x, y).apply_async()
)
return canvas
and this is my celery.py
file:
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myfirsttask.settings')
app = Celery()
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
app.conf.beat_schedule = {
'run-every-five-seconds': {
'task': 'run-chain',
'schedule': 5.0,
'args': (2, 3)
}
}
Your help is much appreciated!
CodePudding user response:
The chain
call isn't supposed to be called on the result of the task, it's to be called on the task. So do this, and then run apply_async()
on the whole chain:
@shared_task(name = 'run-chain')
def run_chain(x, y):
canvas = chain(
add.s(x, y),
substract.s(x, y),
multiply.s(x, y),
divide.s(x, y)
).apply_async()
return canvas
This shouldn't work, because the result of the previous task is passed as the first argument to the next task. Maybe you want to do something like this:
@shared_task(name = 'run-chain')
def run_chain(x, y):
canvas = chain(
add.s(x, y),
substract.s(y),
multiply.s(y),
divide.s(y)
).apply_async()
return canvas
Or something of the sort.