Home > Blockchain >  How to deserialize Xcom strings in Airflow?
How to deserialize Xcom strings in Airflow?


Consider a DAG containing two tasks: DAG: Task A >> Task B (BashOperators or DockerOperators). They need to communicate through XComs.

  • Task A outputs the informations through a one-line json in stdout, which can then be retrieve in the logs of Task A, and so in its return_value XCom key if xcom_push=True. For instance : {"key1":1,"key2":3}

  • Task B only needs the key2 information from Task A, so we need to deserialize the return_value XCom of Task A to extract only this value and pass it directly to Task B, using the jinja template {{xcom_pull('task_a')['key2']}}. Using it as this results in jinja2.exceptions.UndefinedError: 'str object' has no attribute 'key2' because return_value is just a string.

For example we can deserialize Airflow Variables in jinja templates (ex: {{ var.json.my_var.path }}). Globally I would like to do the same thing with XComs.

Edit: a workaround is to convert the json string into a python dictionary before sending it to Xcom (see below).

CodePudding user response:

You can add a post function to the BashOperator that deserialize the result and push all keys separately

def _post(context, result):
    ti = context["ti"]
    output = json.loads(result)
    for key, value in output.items():
        ti.xcom_push(key, value)

    bash_command='bash command',

CodePudding user response:

Xcom values are stored in their native type, you don’t need to deserialize since it isn’t stored as JSON. They are just pickled/unpickled and available as variables in the template.


CodePudding user response:

A workaround is to create a custom Operator (inherited from BashOperator or DockerOperator) and augment the execute method:

  1. execute the original execute method
  2. intercepts the last log line of the task
  3. tries to json.loads() it in a Python dictionnary
  4. finally return the output (which is now a dictionnary, not a string)

The previous jinja template {{ xcom_pull('task_a')['key2'] }} is now working in task B, since the XCom value is now a Python dictionnary.

class BashOperatorExtended(BashOperator):
    def execute(self, context):
        output = BashOperator.execute(self, context)
            output = json.loads(output)
        return output

class DockerOperatorExtended(DockerOperator):
    def execute(self, context):
        output = DockerOperator.execute(self, context)
            output = json.loads(output)
        return output

But creating a new operator just for that purpose is not really satisfying..

  • Related