Home > Enterprise >  Airflow DAG Scheduling last day of month -n days
Airflow DAG Scheduling last day of month -n days

Time:09-23

I want to schedule my dag to run 3 days before the last day of month, so for Feb my dag should run on 25 whereas for march the dag should run on 28th day. Any ideas on how I could schedule this ?

Thanks

CodePudding user response:

For Airflow < 2.2.0:

you can schedule DAGs only if you can "say it" in a single cron expression. If your scheduling wish doesn't fit a cron expression then you can not set it out of the box. You can however find a cron expression that is close enough to what you wish like (0 0 25-31 * * - every day-of-month from 28 through 31 ) and place a ShortCircuitOperator in the beginning of your DAG that will verify if the date is actually 3 days before the end of the month. If the date is matched it will continue to execute downstream task if the date doesn't match it will skip the downstream tasks:

import calendar
from datetime import datetime, date, timedelta

from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import ShortCircuitOperator

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2021, 8, 21)

}


def check_if_last_day_of_month(execution_date):
    #  calendar.monthrange return a tuple (weekday of first day of the
    #  month, number
    #  of days in month)
    run_date = datetime.fromtimestamp(execution_date.timestamp())
    last_day_of_month = calendar.monthrange(run_date.year, run_date.month)[1]
    # check if date is 3 days behind the last day of the month
    if run_date == date(run_date.year, run_date.month, last_day_of_month) - timedelta(days=3):
        return True
    return False


with DAG(
    dag_id='short_example',
    schedule_interval="@once",
    default_args=default_args,
) as dag:
    first = ShortCircuitOperator(
        task_id='verify_date',
        python_callable=check_if_last_day_of_month
    )

    second = DummyOperator(task_id='task')

    first >> second

Example run for the 2021-01-30:

enter image description here

Example run for the 2021-01-28:

enter image description here

Note: Make sure you are comparing the date that interests you. In the example I compared the execution_date of the DAG.

For Airflow >= 2.2.0: (currently beta release 2.2.0b2)

AIP-39 Richer scheduler_interval is available. You can define your own Timetable for the scheduling. You can read the documentation for this feature in PR.

  • Related