this the error log showing when i am trying to run the program in apache-airflow UI
ERROR [airflow.models.dagbag.DagBag] Failed to import: /d/Program Files/meta airflow/dags/csv-json.py
Traceback (most recent call last):
File "/home/siva/.local/lib/python3.6/site-packages/airflow/models/dagbag.py", line 331, in _load_modules_from_file
loader.exec_module(new_module)
File "<frozen importlib._bootstrap_external>", line 678, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/d/Program Files/meta airflow/dags/csv-json.py", line 39, in <module>
fetchdata=PythonOperator('fetch_data',python_callable=load_csv_data(),dag=dag)
File "/home/siva/.local/lib/python3.6/site-packages/airflow/models/baseoperator.py", line 145, in apply_defaults
raise AirflowException("Use keyword arguments when initializing operators")
airflow.exceptions.AirflowException: Use keyword arguments when initializing operators
Initialization done
the program is about conversion of a csv dataset into a structured/nested json file here is the code
import json
import csv
import os
import pandas as pd
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
from airflow.models import Variable
AIRFLOW_HOME = os.getenv('AIRFLOW_HOME')
csv_file=Variable.get("csv_path")
def load_csv_data():
with open(csv_file,'r') as file:
data = pd.read_csv(file,index_col='show_id')
return("dataframe created")
def process_into_json(data):
data.groupby(['show_id','type','title'])
data.apply(lambda x: x[['director','cast','country','date_added','release_year','rating','duration','listed_in']].to_dict('records'))
data.reset_index()
data.rename(columns={0:'details'})
return ("processing complete")
def store_into_json(data):
data.to_json('data\sample-supermarket.json',indent=5,orient='records')
return(" storing done")
default_args = {'owner': 'airflow','start_date': datetime(2022, 4, 4),}
with DAG("csv-to-json-conversion",
schedule_interval='@daily',
start_date=datetime(2022, 4, 4),
default_args=default_args,
tags=['conversion_of_data_file']) as dag:
fetchdata=PythonOperator('fetch_data',python_callable=load_csv_data(),dag=dag)
processdata=PythonOperator('process_data',python_callable=process_into_json(),dag=dag)
loaddata=PythonOperator('load_data',python_callable=store_into_json(),dag=dag)
start=DummyOperator("start",dag=dag)
dead=DummyOperator("dead",dag=dag)
end=DummyOperator("end",dag=dag)
start>>collectdata>>[processdata>>loaddata,dead]>>end
check the code and give me a solution for the error and if there is a better way to write this program do suggest your opinion this the the data set I was using link
CodePudding user response:
There are two errors in your code:
You don't use keyword arguments when initializing operators: use
PythonOperator(task_id='fetch_data', ...)
instead ofPythonOperator('fetch_data', ...)
Your
python_callable
arguments must be callable: usePythonOperator(...,python_callable=load_csv_data, ...)
instead ofPythonOperator(...,python_callable=load_csv_data(), ...)
with DAG("csv-to-json-conversion",
schedule_interval='@daily',
start_date=datetime(2022, 4, 4),
default_args=default_args,
tags=['conversion_of_data_file']) as dag:
fetchdata=PythonOperator(task_id='fetch_data',python_callable=load_csv_data,dag=dag)
processdata=PythonOperator(task_id='process_data',python_callable=process_into_json,dag=dag)
loaddata=PythonOperator(task_id='load_data',python_callable=store_into_json,dag=dag)
start=DummyOperator(task_id="start",dag=dag)
dead=DummyOperator(task_id="dead",dag=dag)
end=DummyOperator(task_id="end",dag=dag)
start>>fetchdata>>[processdata>>loaddata,dead]>>end