Home > database >  Authenticating to a REST endpoint inside Airflow DAG
Authenticating to a REST endpoint inside Airflow DAG

Time:07-02

I need to extract data from an external REST API inside an Airflow DAG. The API is protected so I need to first authenticate/login to the API as a user, and then extract data by passing an access_token in the API call. Need some help in implementing this functionality. If anyone has ever done anything similar or some example would really help. Thanks

CodePudding user response:

  1. Create http connection (in the UI under admin->connections)
  2. In your DAG you should create 2 SimpleHttpOperator. The first task will login and read the access-token, the second one would send a request to get the data and in the header you should put the access-token from the login task.

Here an example of login and data tasks assuming the login is basic_auth and the access-token is bearer auth

userAndPass = b64encode(b"user:password").decode("ascii")
headers = {'Authorization': 'Basic %s' % userAndPass}
login = SimpleHttpOperator(
    task_id="login",
    http_conn_id="conn_id",
    endpoint="/login",
    method="POST",
    headers=headers,
    response_check=lambda response: response.json()["access_token"])

get_data = SimpleHttpOperator(
    task_id="get_data",
    http_conn_id="conn_id",
    endpoint="/data",
    method="POST",
    headers={"Authorization": "Bearer {{ ti.xcom_pull(task_ids='login', key='return_value') }}",
             "Accept": "application/json"},
    response_check=lambda response: response.json())

(login >> get_data)

CodePudding user response:

It will be better that you do it in python code as far as calling the rest api. This python code can take argument such as a filename with timestamp and then dump the data to that file. So this can be tested independently outside of airflow. It would be testing just a regular python code.

Then call this python code/file with a BashOperator. In DAG, it will look something like this. Note, filename is hard-coded but that can be replaced with Jinja templates that can return a filename with timestamp.

task1 = BashOperator(task_id='get_data',
                     bash_command="python ~/airflow/dags/src/rest_api_call.py data_20220701_010101.txt")

task2 = PythonOperator(
        task_id='load_data',
        provide_context=True,
        python_callable=load_data_fn,
        op_kwargs={ 'filename': 'data_20220701_010101.txt'},
        dag=dag)
...

def load_data_fn(**kwargs):
    print(kwargs.get("filename"))
    ...
  • Related