Consider a DAG containing two tasks: DAG: Task A >> Task B
(BashOperators or DockerOperators). They need to communicate through XComs.
Task A
outputs the informations through a one-line json in stdout, which can then be retrieve in the logs ofTask A
, and so in its return_value XCom key ifxcom_push=True
. For instance :{"key1":1,"key2":3}
Task B
only needs thekey2
information fromTask A
, so we need to deserialize the return_value XCom ofTask A
to extract only this value and pass it directly toTask B
, using the jinja template{{xcom_pull('task_a')['key2']}}
. Using it as this results injinja2.exceptions.UndefinedError: 'str object' has no attribute 'key2'
because return_value is just a string.
For example we can deserialize Airflow Variables in jinja templates (ex: {{ var.json.my_var.path }}
). Globally I would like to do the same thing with XComs.
Edit: a workaround is to convert the json string into a python dictionary before sending it to Xcom (see below).
CodePudding user response:
You can add a post function to the BashOperator that deserialize the result and push all keys separately
def _post(context, result):
ti = context["ti"]
output = json.loads(result)
for key, value in output.items():
ti.xcom_push(key, value)
BashOperator(
task_id="task_id",
bash_command='bash command',
post_execute=_post
)
CodePudding user response:
Xcom values are stored in their native type, you don’t need to deserialize since it isn’t stored as JSON. They are just pickled/unpickled and available as variables in the template.
xcom_pull('task_a')['key2']
CodePudding user response:
A workaround is to create a custom Operator (inherited from BashOperator or DockerOperator) and augment the execute
method:
- execute the original
execute
method - intercepts the last log line of the task
- tries to
json.loads()
it in a Python dictionnary - finally return the output (which is now a dictionnary, not a string)
The previous jinja template {{ xcom_pull('task_a')['key2'] }}
is now working in task B
, since the XCom value is now a Python dictionnary.
class BashOperatorExtended(BashOperator):
def execute(self, context):
output = BashOperator.execute(self, context)
try:
output = json.loads(output)
except:
pass
return output
class DockerOperatorExtended(DockerOperator):
def execute(self, context):
output = DockerOperator.execute(self, context)
try:
output = json.loads(output)
except:
pass
return output
But creating a new operator just for that purpose is not really satisfying..