Home > Back-end >  S3KeySensor in Airflow 2
S3KeySensor in Airflow 2

Time:05-06

I have a dag called my_dag.py that utilizes the S3KeySensor in Airflow 2 to check if a s3 key exists. When I use the sensor directly inside the dag, it works:

with TaskGroup('check_exists') as check_exists: 
    
  path = 's3://my-bucket/data/my_file'
  poke_interval = 30
  timeout = 60*60
  mode = 'reschedule'
  dependency_name = 'my_file'

  S3KeySensor(
    task_id = 'check_'   dependency_name   '_exists',
    bucket_key = path,
    poke_interval = poke_interval,
    timeout = timeout,
    mode = mode
  )

The log of the above looks like:

[2022-05-03, 19:51:26 UTC] {s3.py:105} INFO - Poking for key : s3://my-bucket/data/my_file
[2022-05-03, 19:51:26 UTC] {base_aws.py:90} INFO - Retrieving region_name from Connection.extra_config['region_name']
[2022-05-03, 19:51:27 UTC] {taskinstance.py:1701} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE

This is correct. The reschedule is expected, because the file does not exist yet.

However, I want to check any number of paths in other dags, so I moved the sensor into a function called test in another file called helpers.py. I use a python operator in my_dag.py within the task group that calls test. It looks like this:

with TaskGroup('check_exists') as check_exists:

  path = 's3://my-bucket/data/my_file'
  dependency_name = 'my_file'

  wait_for_dependencies = PythonOperator(
    task_id = 'wait_for_my_file',
    python_callable = test,
    op_kwargs = {
      'dependency_name': dependency_name,
      'path': path
    },
    dag = dag
  )

  wait_for_dependencies

The function test in helpers.py looks like:

def test(dependency_name, path, poke_interval = 30, timeout = 60 * 60, mode = 'reschedule'):

    S3KeySensor(
        task_id = 'check_'   dependency_name   '_exists',
        bucket_key = path,
        poke_interval = poke_interval,
        timeout = timeout,
        mode = mode
    )

However, when I run the dag, the step is marked as success even though the file is not there. The logs show:

[2022-05-03, 20:07:54 UTC] {python.py:175} INFO - Done. Returned value was: None
[2022-05-03, 20:07:54 UTC] {taskinstance.py:1282} INFO - Marking task as SUCCESS.

It seems airflow doesn't like using a sensor via a python operator. Is this true? Or am I doing something wrong?

My goal is to loop through multiple paths and check if each one exists. However, I do this in other dags, which is why I'm putting the sensor in a function that resides in another file.

If there are alternative ideas to doing this, I'm open!

Thanks for your help!

CodePudding user response:

This will not work as you expect. You created a case of operator inside operator. See this answer for information about what this means.

In your case you wrapped the S3KeySensor with PythonOperator. This means that when the PythonOperator runs it only execute the init function of S3KeySensor - it doesn't invoke the logic of the operator itself. Using operator inside operator is a bad practice.

Your case is even more extreme as you are trying to use sensor inside operator. Sensors need to invoke the poke() function for every poking cycle. To simplify - You can not enjoy the power of Sensor with mode = 'reschedule' when you set them as you did because reschedule means that you want to release the worker if condition is not met yet but PythonOperator doesn't know how to do that.

How to solve your issue:

Option 1:

From the code you showed you can simply do:

with TaskGroup('check_exists') as check_exists:
    
  path = 's3://my-bucket/data/my_file'
  dependency_name = 'my_file'

  S3KeySensor(
    task_id='check_'   dependency_name   '_exists',
    bucket_key=path,
    poke_interval=30,
    timeout=60 * 60,
    mode='reschedule'
  )

I didn't see a reason why this can't work for you.

Option 2:

If for some reason option 1 is not good for you then create a custom sensor that accept also dependency_name, path and use it like any other operator. I didn't test it but something like the following should work:

class MyS3KeySensor(S3KeySensor):
    def __init__(
        self,
        *,
        dependency_name:str = None,
        path: str = None,
        **kwargs,
    ):
        super().__init__(**kwargs)
        self.task_id = task_id = 'check_'   dependency_name   '_exists'
        self.bucket_name = path
  • Related