Home > OS >  ERROR: airflow.exceptions.AirflowException: Use keyword arguments when initializing operators
ERROR: airflow.exceptions.AirflowException: Use keyword arguments when initializing operators

Time:04-30

this the error log showing when i am trying to run the program in apache-airflow UI

ERROR [airflow.models.dagbag.DagBag] Failed to import: /d/Program Files/meta airflow/dags/csv-json.py
Traceback (most recent call last):
  File "/home/siva/.local/lib/python3.6/site-packages/airflow/models/dagbag.py", line 331, in _load_modules_from_file
    loader.exec_module(new_module)
  File "<frozen importlib._bootstrap_external>", line 678, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/d/Program Files/meta airflow/dags/csv-json.py", line 39, in <module>
    fetchdata=PythonOperator('fetch_data',python_callable=load_csv_data(),dag=dag)
  File "/home/siva/.local/lib/python3.6/site-packages/airflow/models/baseoperator.py", line 145, in apply_defaults
    raise AirflowException("Use keyword arguments when initializing operators")
airflow.exceptions.AirflowException: Use keyword arguments when initializing operators
Initialization done

the program is about conversion of a csv dataset into a structured/nested json file here is the code

import json
import csv
import os
import pandas as pd
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
from airflow.models import Variable

AIRFLOW_HOME = os.getenv('AIRFLOW_HOME')

csv_file=Variable.get("csv_path")

def load_csv_data():
    with open(csv_file,'r') as file:
        data = pd.read_csv(file,index_col='show_id')
    return("dataframe created")

def process_into_json(data):
    data.groupby(['show_id','type','title'])
    data.apply(lambda x: x[['director','cast','country','date_added','release_year','rating','duration','listed_in']].to_dict('records'))
    data.reset_index()
    data.rename(columns={0:'details'})
    return ("processing complete")

def store_into_json(data):
    data.to_json('data\sample-supermarket.json',indent=5,orient='records')
    return(" storing done")

default_args = {'owner': 'airflow','start_date': datetime(2022, 4, 4),}

with DAG("csv-to-json-conversion",
        schedule_interval='@daily',
        start_date=datetime(2022, 4, 4),
        default_args=default_args,
        tags=['conversion_of_data_file']) as dag:
        
        fetchdata=PythonOperator('fetch_data',python_callable=load_csv_data(),dag=dag)
        
        processdata=PythonOperator('process_data',python_callable=process_into_json(),dag=dag)

        loaddata=PythonOperator('load_data',python_callable=store_into_json(),dag=dag)
        
        start=DummyOperator("start",dag=dag)

        dead=DummyOperator("dead",dag=dag)

        end=DummyOperator("end",dag=dag)

        start>>collectdata>>[processdata>>loaddata,dead]>>end

check the code and give me a solution for the error and if there is a better way to write this program do suggest your opinion this the the data set I was using link

CodePudding user response:

There are two errors in your code:

  1. You don't use keyword arguments when initializing operators: use PythonOperator(task_id='fetch_data', ...) instead of PythonOperator('fetch_data', ...)

  2. Your python_callable arguments must be callable: use PythonOperator(...,python_callable=load_csv_data, ...) instead of PythonOperator(...,python_callable=load_csv_data(), ...)

with DAG("csv-to-json-conversion",
        schedule_interval='@daily',
        start_date=datetime(2022, 4, 4),
        default_args=default_args,
        tags=['conversion_of_data_file']) as dag:
        
        fetchdata=PythonOperator(task_id='fetch_data',python_callable=load_csv_data,dag=dag)
        
        processdata=PythonOperator(task_id='process_data',python_callable=process_into_json,dag=dag)

        loaddata=PythonOperator(task_id='load_data',python_callable=store_into_json,dag=dag)
        
        start=DummyOperator(task_id="start",dag=dag)

        dead=DummyOperator(task_id="dead",dag=dag)

        end=DummyOperator(task_id="end",dag=dag)

        start>>fetchdata>>[processdata>>loaddata,dead]>>end
  • Related