I have the following Airflow DAG which uploads a single local file into an S3 bucket.
# airflow related
from airflow import DAG
from airflow.operators.python import PythonOperator
# other packages
from datetime import datetime
import boto3
with DAG(
dag_id='upload_to_s3',
start_date=datetime(2020, 5, 5),
schedule_interval='@once',
catchup=False,
) as dag:
pass
def file_upload():
#Creating Session With Boto3.
session = boto3.Session(
aws_access_key_id='my_access_key_id',
aws_secret_access_key='my_aws_secret_access_key'
)
#Creating S3 Resource From the Session.s
s3 = session.resource('s3')
result = s3.Bucket('flight-data-test-bucket').upload_file('/opt/airflow/dags/pricedata.xlsx', 'pricedata.xlsx')
return result
with DAG(
dag_id='upload_to_s3',
start_date=datetime(2020, 5, 5),
schedule_interval='@once',
catchup=False,
) as dag:
# Upload the file
task_file_to_s3 = PythonOperator(
task_id='upload_to_s3',
python_callable=file_upload
)
The DAG is imported in Airflow without any errors however when I try to force run it doesn't do anything as can be also seen in the below screenshot:
When I check the Task Instance Details it says that "Dependencies Blocking Task From Getting Scheduled. DependencyReasonTask Instance StateTask is in the 'queued' state."
I assume that this might be related to something going wrong with the start_date or schedule_interval but I'm not sure. Any ideas? I have been running Airflow on Windows through Docker.
CodePudding user response:
It turned out that there was no problem with my DAG but something was wrong with my docker.
After removing everything(containers, images, etc.) I deleted all my airflow data and run a clean install by following this tutorial to install airflow. Now my DAG is working.