Home > Mobile >  Airflow DAG created using Taskflow failing with error: wrapper() missing 1 required positional argum
Airflow DAG created using Taskflow failing with error: wrapper() missing 1 required positional argum

Time:12-29

I am trying to create a simple POC Airflow (2.2.3) DAG using Taskflow. Given below is the code:

from datetime import datetime
from airflow.decorators import dag, task
import yaml
import sys
import logging    

sys.path.append('airflow/scripts/')
from etl_module import extract_data, load_data_catalog, transform_data

DAG_CONFIG_YAML = 'airflow/configs/test.yaml'
with open(DAG_CONFIG_YAML, 'r') as data:
    config = yaml.load(data, Loader=yaml.SafeLoader)

@dag(
    schedule_interval=config['dag_config']['schedule'], 
    start_date=config['dag_config']['start_date'],
    tags=['test'],
)
def test_daily_load_workflow():

    extract = task(extract_data.extract_data(config))
    transform = task(transform_data.transform_data(config))
    load = task(load_data_catalog.load_data_catalog(config))

    extract() >> transform() >> load()

test_daily_load_workflow_obj = test_daily_load_workflow()

The task functions (which contain only logging statements for now) are running when I try to run this file but it keeps failing towards the end and, consequently, does not get registered as a DAG.

Execution output:

[2021-12-28 19:38:16,659] {extract_data.py:5} INFO - extracting data
[2021-12-28 19:38:18,662] {extract_data.py:12} INFO - data extraction successful
[2021-12-28 19:38:18,662] {transform_data.py:5} INFO - transform / enrich data
[2021-12-28 19:38:20,665] {transform_data.py:11} INFO - data transformation successful
[2021-12-28 19:38:20,665] {load_data_catalog.py:5} INFO - final touch-ups / enrichments
[2021-12-28 19:38:22,668] {load_data_catalog.py:11} INFO - data catalog write successful
Traceback (most recent call last):
  File "/mnt/e/projects/airflow-poc/airflow/dags/test/test_daily_load_workflow.py", line 40, in <module>
    test_daily_load_workflow1 = test_daily_load_workflow()
  File "/mnt/e/projects/airflow-poc/.venv/lib/python3.8/site-packages/airflow/models/dag.py", line 2984, in factory
    f(**f_kwargs)
  File "/mnt/e/projects/airflow-poc/airflow/dags/test/test_daily_load_workflow.py", line 37, in test_daily_load_workflow
    extract() >> transform() >> load()
TypeError: wrapper() missing 1 required positional argument: 'f'

This should be a simple problem to solve but I am not able to see what I am doing wrong!

CodePudding user response:

I believe since the task decorator takes some arguments the syntax should be:

@dag(
    schedule_interval=config['dag_config']['schedule'], 
    start_date=config['dag_config']['start_date'],
    tags=['test'],
)
def test_daily_load_workflow():
    extract = task()(extract_data.extract_data(config))
    transform = task()(transform_data.transform_data(config))
    load = task()(load_data_catalog.load_data_catalog(config))

    extract >> transform >> load

The reason you get this error is because the task decorator takes some parameters and the resulting function should then be passed your function to be wrapped which in turn returns a Task.

CodePudding user response:

I don't know the root cause of this issue but I changed the code to below and it started working! Essentially moved task decorator also into the definition in the scripts folder.

@dag(
    schedule_interval=config['dag_config']['schedule'], 
    start_date=config['dag_config']['start_date'],
    catchup=config['dag_config']['catch_up'], 
    tags=['test'],
    )
def test_daily_load_workflow():
    (
        extract_data.extract_data(config) >>
        transform_data.transform_data(config) >>
        load_data_catalog.load_data_catalog(config)
    )

test_daily_load_workflow = test_daily_load_workflow()
  • Related