My goal is to read file from hdfs in airflow and do further manipulations.
After researching, I found that url I need to use is as follows:
df = pd.read_parquet('http://localhost:9870/webhdfs/v1/hadoop_files/sample_2022_01.parquet?op=OPEN')
,
where localhost/172.20.80.1/computer-name.mshome.net can be interchangeably used,
9870 - namenode port,
hadoop_files/sample_2022_01.parquet - my folder and file created in the root.
I can access and read file locally in PyCharm, but I am unable to get the same result inside airflow in docker. I tried using local hdfs and hdfs hosted in docker and changing host to the host.docker.internal, but I am getting the same error.
Stack trace:
[2022-06-12, 17:52:45 UTC] {taskinstance.py:1889} ERROR - Task failed with exception
Traceback (most recent call last):
File "/usr/local/lib/python3.7/urllib/request.py", line 1350, in do_open
encode_chunked=req.has_header('Transfer-encoding'))
File "/usr/local/lib/python3.7/http/client.py", line 1281, in request
self._send_request(method, url, body, headers, encode_chunked)
File "/usr/local/lib/python3.7/http/client.py", line 1327, in _send_request
self.endheaders(body, encode_chunked=encode_chunked)
File "/usr/local/lib/python3.7/http/client.py", line 1276, in endheaders
self._send_output(message_body, encode_chunked=encode_chunked)
File "/usr/local/lib/python3.7/http/client.py", line 1036, in _send_output
self.send(msg)
File "/usr/local/lib/python3.7/http/client.py", line 976, in send
self.connect()
File "/usr/local/lib/python3.7/http/client.py", line 948, in connect
(self.host,self.port), self.timeout, self.source_address)
File "/usr/local/lib/python3.7/socket.py", line 728, in create_connection
raise err
File "/usr/local/lib/python3.7/socket.py", line 716, in create_connection
sock.connect(sa)
OSError: [Errno 113] No route to host
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/operators/python.py", line 207, in execute
branch = super().execute(context)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/operators/python.py", line 171, in execute
return_value = self.execute_callable()
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/operators/python.py", line 189, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/opt/airflow/dags/includes/parquet_dag/main.py", line 15, in main
df_parquet = read('hdfs://localhost:9000/hadoop_files/sample_2022_01.parquet')
File "/opt/airflow/dags/includes/parquet_dag/utils.py", line 29, in read
df = pd.read_parquet('http://172.20.80.1:9870/webhdfs/v1/hadoop_files/sample_2022_01.parquet?op=OPEN')
File "/home/airflow/.local/lib/python3.7/site-packages/pandas/io/parquet.py", line 500, in read_parquet
**kwargs,
File "/home/airflow/.local/lib/python3.7/site-packages/pandas/io/parquet.py", line 236, in read
mode="rb",
File "/home/airflow/.local/lib/python3.7/site-packages/pandas/io/parquet.py", line 102, in _get_path_or_handle
path_or_handle, mode, is_text=False, storage_options=storage_options
File "/home/airflow/.local/lib/python3.7/site-packages/pandas/io/common.py", line 614, in get_handle
storage_options=storage_options,
File "/home/airflow/.local/lib/python3.7/site-packages/pandas/io/common.py", line 312, in _get_filepath_or_buffer
with urlopen(req_info) as req:
File "/home/airflow/.local/lib/python3.7/site-packages/pandas/io/common.py", line 212, in urlopen
return urllib.request.urlopen(*args, **kwargs)
File "/usr/local/lib/python3.7/urllib/request.py", line 222, in urlopen
return opener.open(url, data, timeout)
File "/usr/local/lib/python3.7/urllib/request.py", line 525, in open
response = self._open(req, data)
File "/usr/local/lib/python3.7/urllib/request.py", line 543, in _open
'_open', req)
File "/usr/local/lib/python3.7/urllib/request.py", line 503, in _call_chain
result = func(*args)
File "/usr/local/lib/python3.7/urllib/request.py", line 1378, in http_open
return self.do_open(http.client.HTTPConnection, req)
File "/usr/local/lib/python3.7/urllib/request.py", line 1352, in do_open
raise URLError(err)
urllib.error.URLError: <urlopen error [Errno 113] No route to host>
With host.docker.internal:
urllib.error.URLError: <urlopen error [Errno 99] Cannot assign requested address>
CodePudding user response:
you need to use any routable address inside airflow docker container.
if hadoop is inside docker container as well, check it ip address using docker inspect CONTAINER
(doc). if hadoop is on localhost you can set network_mode: "host"
(doc)
also there is an important notice if you are on macos and have the docker desktop app which basically a virtual machine. so in this case you need some extra settings, check this, for example.
CodePudding user response:
where localhost/172.20.80.1/computer-name.mshome.net can be interchangeably used,
They shouldn't be interchangeable inside Docker network.
From Airflow, you could use Docker service names, not IP addresses, and ensure the containers are in the same bridge network (not host mode, which only works on Linux). host.docker.internal
isn't correct either since you're trying to reach another container, not your host
https://docs.docker.com/network/bridge/
I'd also recommend using Airflow Spark operators to actually read Parquet from HDFS, using Spark, not Pandas or WebHDFS. You can convert Spark dataframes to Pandas, if needed