Home > other >  TypeError: unsupported operand type(s) for |: 'chain' and 'AsyncResult'
TypeError: unsupported operand type(s) for |: 'chain' and 'AsyncResult'

Time:09-28

I'm new to Celery and want to implement a basic example of a chain containing four tasks. The first task passes its return value to the next task in the chain, and so on. While running Celery worker, I got the following error:

ERROR/ForkPoolWorker-4] Task run-chain[...] raised unexpected: TypeError("unsupported operand type(s) for |: 'chain' and 'AsyncResult'")

Here is my simple chain with the tasks:

from celery import chain, signature
from celery import shared_task



@shared_task(name = 'add')
def add(x, y):
    return x   y


@shared_task(name = 'substract')
def substract(x, y):
    return x - y


@shared_task(name = 'multiply')
def multiply(x, y):
    return x * y


@shared_task(name = 'divide')
def divide(x, y):
    return x / y


@shared_task(name = 'run-chain')
def run_chain(x, y):
    canvas = chain(
        add.s(x, y).apply_async(),
        substract.s(x, y).apply_async(),
        multiply.s(x, y).apply_async(),
        divide.s(x, y).apply_async()
    )
    return canvas

and this is my celery.py file:

import os
from celery import Celery



os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myfirsttask.settings')

app = Celery()
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()

app.conf.beat_schedule = {
    'run-every-five-seconds': {
        'task': 'run-chain',  
        'schedule': 5.0,
        'args': (2, 3) 
    }
}  

Your help is much appreciated!

CodePudding user response:

The chain call isn't supposed to be called on the result of the task, it's to be called on the task. So do this, and then run apply_async() on the whole chain:

@shared_task(name = 'run-chain')
def run_chain(x, y):
    canvas = chain(
        add.s(x, y),
        substract.s(x, y),
        multiply.s(x, y),
        divide.s(x, y)
    ).apply_async()
    return canvas

This shouldn't work, because the result of the previous task is passed as the first argument to the next task. Maybe you want to do something like this:

@shared_task(name = 'run-chain')
def run_chain(x, y):
    canvas = chain(
        add.s(x, y),
        substract.s(y),
        multiply.s(y),
        divide.s(y)
    ).apply_async()
    return canvas

Or something of the sort.

  • Related