I'm trying to set an import modules from other python files but I'm been unable to fix it.
The file script is "energy_data" that has the script.
import sys
sys.path.append('/home/tayzer/dags/tasks')
from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
# Functions from other files
from get_data import load_data
#Default settings applied to all tasks
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1)
}
dag = DAG(
dag_id="Energy_Consumption",
description='Energy Data ETL',
default_args=default_args,
schedule_interval=timedelta(days=1),
)
t0 = BashOperator(
task_id='print_date',
bash_command='date',
)
t1 = PythonOperator(
task_id = "load_AEP_hourly",
python_callable = load_data,
op_kwargs = {'filename':'AEP_hourly'},
retries = 2
)
But on airflow I receive the error
Broken DAG: [/home/tayzer/workspace/dags/energy_data.py] Traceback (most recent call last):
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/home/tayzer/workspace/dags/energy_data.py", line 16, in <module>
from get_data import load_data
ModuleNotFoundError: No module named 'get_data'
I already have a file called "settings.json" with the following info
{
"python.analysis.extraPaths": [
"./dags/tasks",
"./dags/Libs"
]
}
And the "Get_data" file has the script
import pandas as pd
from utils import push_xcom_value
def load_data(filename):
data = pd.read_csv(f"{filename}.csv").to_json()
and here you can find my file structures
Let me know if I need to add anything to help to fix this issue
CodePudding user response:
Please have a look at PEP 328 that has details about Imports
https://peps.python.org/pep-0328/
Relative imports should have a leading .
not to be confused with external or standard library packages
CodePudding user response:
After a comment from @Bijay Regmi
I deleted the "import sys sys.path.append('/home/tayzer/dags/tasks')"
And stayed with "from tasks.get_data import load_data"