Home > Enterprise >  celery task result in postgres database is in byte format?
celery task result in postgres database is in byte format?

Time:11-15

I am new to Celery. I tried using redis and postgres db respectively as my result backend. While for redis backend, the result is retrieved successfully, for postgres backend, the result retrieved appears to be in byte format. I am wondering if I missed anything in my code/setup, or it is expected?

I started redis and postgres on my laptop in docker containers as follows:

docker run -d --name redis_broker -p 6379:6379 redis:latest
docker run -d --name pg_backend -p 5432:5432 -e POSTGRES_PASSWORD=123 postgres:latest

Then I wrote the following very simple python code:

tasks.py

from celery import Celery

app = Celery('tasks',
             #backend='redis://localhost:6379/0',
             backend='db postgresql://postgres:123@localhost:5432/celery',
             broker='redis://localhost:6379/0')

@app.task(name='tasks.add')
def add(x:int, y:int) -> int:
    z = x y
    return z

caller.py

from tasks import add

result = add.delay(3,2)
print(result.get())

Next, I start my celery worker in my terminal as follows:

celery -A tasks workder --loglevel=INFO

and in another terminal window, I run my caller.py as follows:

python caller.py

After the caller.py is done, 5 is printed out in the terminal, which is correct.

Then I tried to retrieve the result in python from my result backend. While for redis backend, I got the result correctly:

>>> import redis
>>> backend = redis.Redis(host='localhost', port=6379)
>>> backend.get('celery-task-meta-1523014d-0ea7-42c2-83b9-272bcdb72891')
b'{"status": "SUCCESS", "result": 5, "traceback": null, "children": [], "date_done": "2021-11-13T07:23:55.012370", "task_id": "1523014d-0ea7-42c2-83b9-272bcdb72891"}'

for postgres backend, I got the following:

>>> import sqlalchemy
>>> from sqlalchemy import create_engine
>>> engine = create_engine('postgresql psycopg2://postgres:123@localhost:5432/celery')
>>> conn = engine.connect()
>>> qry = 'SELECT * FROM celery_taskmeta;'
>>> cursor = conn.execute(qry)
>>> cursor.fetchall()
(1, 'eb21609d-0f84-47c7-afb4-9bccbe41eda4', 'SUCCESS', <memory at 0x11051ef40>, datetime.datetime(2021, 11, 13, 7, 33, 55, 559665), None, None, None, None, None, None, None)]

I also tried querying in postgres

celery=> select id, task_id, result, status from celery_taskmeta;
 id |               task_id                |              result              | status
---- -------------------------------------- ---------------------------------- ---------
  1 | eb21609d-0f84-47c7-afb4-9bccbe41eda4 | \x80054b052e                     | SUCCESS
(1 rows)

celery=>

In this case, if some other components of my application needs to retrieve the task results from my (postgres) result backend, how can I read this "memory object"?

I am wondering if I have missed anything in my setup or code?

Thanks in advance for any suggestions!

CodePudding user response:

Well, I figured it out myself by checking out the source code of Celery (https://github.com/celery/celery/blob/master/celery/backends/database/models.py) In the Celery source code, for database backend using SQLAlchemy, result is serialized, and its type is: PickleType. Therefore, the answer to my question is very simple! Just simply deserialize it by calling pickle.loads() as follows:

>>> import sqlalchemy, pickle
>>> from sqlalchemy import create_engine
>>> engine = create_engine('postgresql psycopg2://postgres:123@localhost:5432/celery')
>>> conn = engine.connect()
>>> qry = 'SELECT * FROM celery_taskmeta;'
>>> cursor = conn.execute(qry)
>>> res = cursor.fetchall()
>>> res
[(1, 'eb21609d-0f84-47c7-afb4-9bccbe41eda4', 'SUCCESS', <memory at 0x11051ef40>, datetime.datetime(2021, 11, 13, 7, 33, 55, 559665), None, None, None, None, None, None, None)]
>>> pickle.loads(res[0][3])
5

Edit: the sample code above is just used for troubleshooting. In production environment, ORM (object-relational mapping) should be used instead. In that case, serialization/deserialization are handled by SQLAlchemy automatically, and explicitly invoking pickle method is not needed.

  • Related