Home > Blockchain >  Can I send binary fles using the SimpleHttpOperator in Airflow?
Can I send binary fles using the SimpleHttpOperator in Airflow?

Time:12-03

I would like to upload binary data with a REST API using an Airflow's SimpleHttpOperator.
With python's requests, I would do that by doing

upload_response = requests.put(
        upload_url, data=Path(f"{filename}").read_bytes(), headers=headers
    )

In Airflow I have a first task downloading the file with an SFTPOperator and then trying to upload it with the SimpleHttpOperator:

get_file = SFTPOperator(
        task_id         = 'get_file',
        ssh_conn_id     = 'my_sftp_conn',
        remote_filepath = f"{templ_remote_filepath}/{filename}",
        local_filepath  = f"{basepath}/{filename}",
        operation       = 'GET',
    )

upload_file = SimpleHttpOperator(
        task_id      = 'upload_file',
        http_conn_id = "my_http_conn",
        endpoint     = get_upload_url.output,
        method       = 'PUT',
        headers      = {"Content-MD5": md5_hash_b64, "Content-Type": "application/text"},
        data         = Path(get_file.output).read_bytes(),
        log_response = True,
    )

I would like to be able to input get_file.output (the downloaded filepath) as a string instead of an XComArgs as it is. How can I do this?

CodePudding user response:

You should not use operators in this case. Each operator will run its own "task instance", so they potentially run on completely different instance. That's why operator uses XCom to store the data, because there is no other way to communicate between tasks. And by default it is rather limited in size because it uses a database (which in your case will save the file to a DB in one task and retrieve it in the other - which is almost for sure not what you want).

What you should do instead, you should use a single PythonOperator and HOOKS not operators. Airflow "API" has two levels - operators are there to provide "blocks" that you can use as complete tasks, but in fact they are usually thin wrappers around Hooks which provide the same capabilities but they can be combined - several hooks can be used in the same operator.

And defining custom PythonOperator is super-easy - especially if you are using the modern approach of TaskFlow API.

See tutorial https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html

So in your case you should have something like (it won't compile likely but will give you aan inspiration):

@task()
def transfer():
    tempfile.TemporaryDirectory() as tmpfile:
        sftp_hook = SFTPHook(ssh_conn_id = 'my_sftp_conn')
        sftp_hook.retrieve_file(remote_file_name='REMOTE_URL_HERE', local_filename=tmp_file)
        data = Path(tmp_file).read_bytes()
        http_hook = HTTPHook()
        http_hook.run(data=data,.....)

Loading the data to memory is however, not super optimized and you could even potentially sinmply use requests library directly instead of HttpHook to upload the file (or maybe even HttpHook already implements streaming.

Full documentation of both hooks:

HtpHook: https://airflow.apache.org/docs/apache-airflow-providers-http/stable/_api/airflow/providers/http/hooks/http/index.html

SFTPHook: https://airflow.apache.org/docs/apache-airflow-providers-sftp/stable/_api/airflow/providers/sftp/hooks/sftp/index.html

  • Related