Home > Blockchain >  Is it possible to use xcom between 2 DockerOperators?
Is it possible to use xcom between 2 DockerOperators?

Time:12-27

it has been 3 days that I'm trying to find an answer to my problem let me explain:

./dags/
├── custom_operator.py
├── docker-xcom
│   ├── other
│   │   ├── Dockerfile
│   │   ├── main.py
│   │   └── requirements.txt
│   └── scraper
│       ├── Dockerfile
│       ├── main.py
│       └── requirements.txt
├── jobs.py

This is my pipeline, basically I'm using docker to have each of my tasks in their own containers. For example the let's have say the code of the scraper/main.py is:

my_dict = {"name":"Luna","age":31}
print(my_dict)

And so I would like to retrieve this "my_dict" variable by using XCom but with a DockerOperator I checked out online but can't remember where I found that solution but basically someone said to make a CustomDockerOperator class and so this is it:

from airflow.providers.docker.operators.docker import DockerOperator

class CustomDockerOperator(DockerOperator):
    def __init__(self, *args, **kwargs):
        # Set the default value for xcom_pull to None
        xcom_pull = kwargs.pop('xcom_pull', None)

        super().__init__(*args, **kwargs)

        self.xcom_pull = xcom_pull

    def execute(self, context):
        # Retrieve the value from XCom using the TaskInstance object and xcom_pull
        if self.xcom_pull:
            ti = context['ti']
            value = ti.xcom_pull(key=self.xcom_pull)
            print(f'Retrieved XCom value: {value}')
        return super().execute(context)

I did this class because apparently the DockerOperator doesn't have the xcom_pull arg.

And for my jobs.py file we have this code:

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from custom_operator import CustomDockerOperator

default_args = {
'owner'                 : 'airflow',
'description'           : 'docker xcom test',
'depend_on_past'        : False,
'start_date'            : datetime(2021, 5, 1),
'email_on_failure'      : False,
'email_on_retry'        : False,
'retries'               : 1,
'retry_delay'           : timedelta(minutes=5)
}

# Here I just tried finding ways to send the value with a key but couldn't even give a key
# Name because it never worked it was always named "return_value"
def push_xcom_value(**kwargs):
    # Extract the return value from the kwargs dictionary
    return_value = kwargs[.xcom_pull(task_ids='task1')
    # Use the ti object to push the XCom value
    kwargs["ti"].xcom_push(value=return_value)

with DAG(
        dag_id="XCOM_DOCKER",
        default_args=default_args,
        description="Docker   XCOM",
        start_date=datetime(2022, 12, 12),
        schedule_interval="30 * * * *", catchup=False
        ) as dag:

        start_dag = EmptyOperator(
                task_id='start_dag'
                )

        end_dag = EmptyOperator(
                task_id='end_dag'
                )

                # Define the first DockerOperator task and store an XCom
        task1 = CustomDockerOperator(
        task_id='task1',
        image='xcom_scraper:latest',
        container_name='xcom_scrape',
        do_xcom_push=True,
        api_version='auto',
        auto_remove=True,
        docker_url="unix://var/run/docker.sock",
        network_mode="bridge",
        # Note that this callback function isn't really useful because
        # The do_xcom_push will return the print that I did previously as a value so
        # Basically the my_dict variable is stored
        on_success_callback=push_xcom_value,
        )

        # Define the second DockerOperator task and retrieve the XCom stored by the first task
        task2 = CustomDockerOperator(
        task_id="task2",
        image='xcom_other:latest',
        container_name='xcom_other',
        api_version='auto',
        auto_remove=True,
        xcom_pull=task1.task_id,
        docker_url="unix://var/run/docker.sock",
        network_mode="bridge",
        )

# Set the dependencies between the tasks
start_dag >> task1 >> task2 >> end_dag

As you can this on this image, the XCom value was successfully stored. XCom value

Now onto the other/main.py file I tried different ways such as this one:

from airflow.models.xcom import XCom

def get_run_id(**kwargs):
    ti = kwargs['ti']
    run_id = ti.xcom_pull(key='run_id', task_ids='task1')
    return run_id

# Retrieve the XCom value stored by the first task
xcom_dict = XCom.get_one(run_id=get_run_id(), task_ids="task1")

print(get_run_id("task1"))
print(f'Retrieved XCom value: {xcom_dict}')

OR that way:

from airflow.models.xcom import XCom

# Retrieve the XCom value stored by the first task
xcom_dict = XCom.get_one(key='xcom_val', task_id='task1')

print(f'Retrieved XCom value: {xcom_dict}')

And many other ways that I sadly didn't save but to no avail... Either the logs would say

"Retrieved XCom value: None"

Or things such as the XCom.get_one requires a run_id but I can't get the run_id because I am not in the jobs.py... If anyone can help me I'd really be grateful, I've tried for at least 10 hours to find a way with OpenAI chat (yes I'm that desperate) but it gave me hints but there was always a problem and it was just running in circles... Thank you to whoever takes the time to look into it.

CodePudding user response:

You have two options:

  1. The easy one: reading the XCom variable in Airflow worker by jinja and provide it to the container as an argument:
DockerOperator(
  ...,
  command="echo {{ ti.xcom_pull(task_ids='other_docker_task_id') }}", # Airflow pushes the last line of the container log automatically as a XCom, if you want to push the full log, you can add the arg `xcom_all=True`
  ...,
)
  1. The more complicated: reading the Xcom variable from the container in runtime. XCom variables are records stored in Airflow metastore, in order to access them from a docker container, this container should have the Airflow lib installed and the configuration to connect to the metastore (database url and creds), in this case the container will be able to read and write Xcom variables without Airflow server help.

If your containers are executed one after the other, you can use the first option, but if you want to run them in parallel and let them communicate using XCom, you need to access XCom variables directly from the containers.

  • Related