I want to schedule my dag to run 3 days before the last day of month, so for Feb my dag should run on 25 whereas for march the dag should run on 28th day. Any ideas on how I could schedule this ?
Thanks
CodePudding user response:
For Airflow < 2.2.0:
you can schedule DAGs only if you can "say it" in a single cron expression. If your scheduling wish doesn't fit a cron expression then you can not set it out of the box. You can however find a cron expression that is close enough to what you wish like (0 0 25-31 * *
- every day-of-month from 28 through 31 ) and place a ShortCircuitOperator
in the beginning of your DAG that will verify if the date is actually 3 days before the end of the month. If the date is matched it will continue to execute downstream task if the date doesn't match it will skip the downstream tasks:
import calendar
from datetime import datetime, date, timedelta
from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import ShortCircuitOperator
default_args = {
'owner': 'airflow',
'start_date': datetime(2021, 8, 21)
}
def check_if_last_day_of_month(execution_date):
# calendar.monthrange return a tuple (weekday of first day of the
# month, number
# of days in month)
run_date = datetime.fromtimestamp(execution_date.timestamp())
last_day_of_month = calendar.monthrange(run_date.year, run_date.month)[1]
# check if date is 3 days behind the last day of the month
if run_date == date(run_date.year, run_date.month, last_day_of_month) - timedelta(days=3):
return True
return False
with DAG(
dag_id='short_example',
schedule_interval="@once",
default_args=default_args,
) as dag:
first = ShortCircuitOperator(
task_id='verify_date',
python_callable=check_if_last_day_of_month
)
second = DummyOperator(task_id='task')
first >> second
Example run for the 2021-01-30
:
Example run for the 2021-01-28
:
Note: Make sure you are comparing the date that interests you. In the example I compared the execution_date
of the DAG.
For Airflow >= 2.2.0: (currently beta release 2.2.0b2)
AIP-39 Richer scheduler_interval is available. You can define your own Timetable for the scheduling. You can read the documentation for this feature in PR.