I'm currently using Airflow (Version : 1.10.10),
and I am interested in creating a DAG, which will run hourly,
that will bring the usage information of a Docker container ( disk usage),
(The information available through the docker CLI command ( df -h) ).
i understand that: "If xcom_push is True, the last line written to stdout will also be pushed to an XCom when the bash command completes"
but my goal is to get a specific value from the bash command, not the last line written.
for example , i would like to get this line ( see screeeshot)
"tmpfs 6.2G 0 6.2G 0% /sys/fs/cgroup"
into my Xcom value, so i could edit and extact a specific value from it,
How can i push the Xcom value to a PythonOperator, so i can edit it?
i add my sample DAG script below,
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime,timedelta
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
default_args = {
'retry': 5,
'retry_delay': timedelta(minutes=5)
}
with DAG(dag_id='bash_dag', schedule_interval="@once", start_date=datetime(2020, 1, 1), catchup=False) as dag:
# Task 1
bash_task = BashOperator(task_id='bash_task', bash_command="df -h", xcom_push=True)
bash_task
<iframe name="sif1" sandbox="allow-forms allow-modals allow-scripts" frameborder="0"></iframe>
Is it applicable? Thanks a lot,
CodePudding user response:
Yhis should do the job:
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime,timedelta
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
default_args = {
'retry': 5,
'retry_delay': timedelta(minutes=5)
}
with DAG(dag_id='bash_dag', schedule_interval="@once", start_date=datetime(2020, 1, 1), catchup=False) as dag:
# Task 1
bash_task = BashOperator(task_id='bash_task', bash_command="docker stats --no-stream --format '{{ json .}}' <container-id>", xcom_push=True)
bash_task
<iframe name="sif2" sandbox="allow-forms allow-modals allow-scripts" frameborder="0"></iframe>
CodePudding user response:
You can retreive the value pushed to XCom store through the output
attribute of the operator.
In the snippet below, bash.output
is an XComArg and will be pulled and passed as the first argument of the callable function when executing the task instance.
from airflow.models.xcom_arg import XComArg
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.models import DAG
with DAG(dag_id='bash_dag') as dag:
bash_task = BashOperator(
task_id='bash_task', bash_command="df -h", xcom_push=True)
def format_fun(stat_terminal_output):
pass
format_task = PythonOperator(
python_callable=format_fun,
task_id="format_task",
op_args=[bash_task.output],
)
bash_task >> format_task