I need to extract data from an external REST API inside an Airflow DAG. The API is protected so I need to first authenticate/login to the API as a user, and then extract data by passing an access_token
in the API call. Need some help in implementing this functionality. If anyone has ever done anything similar or some example would really help. Thanks
CodePudding user response:
- Create http connection (in the UI under admin->connections)
- In your DAG you should create 2 SimpleHttpOperator. The first task will login and read the access-token, the second one would send a request to get the data and in the header you should put the access-token from the login task.
Here an example of login and data tasks assuming the login is basic_auth and the access-token is bearer auth
userAndPass = b64encode(b"user:password").decode("ascii")
headers = {'Authorization': 'Basic %s' % userAndPass}
login = SimpleHttpOperator(
task_id="login",
http_conn_id="conn_id",
endpoint="/login",
method="POST",
headers=headers,
response_check=lambda response: response.json()["access_token"])
get_data = SimpleHttpOperator(
task_id="get_data",
http_conn_id="conn_id",
endpoint="/data",
method="POST",
headers={"Authorization": "Bearer {{ ti.xcom_pull(task_ids='login', key='return_value') }}",
"Accept": "application/json"},
response_check=lambda response: response.json())
(login >> get_data)
CodePudding user response:
It will be better that you do it in python code as far as calling the rest api. This python code can take argument such as a filename with timestamp and then dump the data to that file. So this can be tested independently outside of airflow. It would be testing just a regular python code.
Then call this python code/file with a BashOperator
. In DAG, it will look something like this. Note, filename is hard-coded but that can be replaced with Jinja templates
that can return a filename with timestamp.
task1 = BashOperator(task_id='get_data',
bash_command="python ~/airflow/dags/src/rest_api_call.py data_20220701_010101.txt")
task2 = PythonOperator(
task_id='load_data',
provide_context=True,
python_callable=load_data_fn,
op_kwargs={ 'filename': 'data_20220701_010101.txt'},
dag=dag)
...
def load_data_fn(**kwargs):
print(kwargs.get("filename"))
...