Home > Software design >  Read ORC file not stored on HDFS using pySpark
Read ORC file not stored on HDFS using pySpark

Time:08-20

I connected to a datalake remotely, processed the data on datalake stored in Hadoop clusters using Hive beeline terminal and stored the data on HDFS as orc format. Then I transferred this orc file to my local system which don't have any Hadoop HDFS setup. I installed spark on my local machine and read one data part of orc file using pyspark.

My orc file has following file structure on my local machine:

orc_table--
    Partition-01--
           1.000000_0
    Partition-02--
           1.000000_0
           2.000000_0
    Partition-03--
           1.000000_0

I can read the single part (like, 1.000000_0) by providing full file path with extension (000000_0) in a spark dataframe using following code

import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkContext
sc = SparkContext("local", "first_app")
spark = SparkSession(sc)
df_spark = spark.read.format("orc").option("inferSchema", "true").orc("C:/orc_table/Partition-01/1.000000_0")

However, I want to read all partition files stored in different folders of orc_table at once in a spark dataframe. I m giving the path till orc_table folder (like "C:/orc_table"). I am getting following error.

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
Input In [8], in <cell line: 1>()
----> 1 df_spark = spark.read.format("orc").option("inferSchema", "true").orc("C:\orc_table\Partition-01")

File C:\Spark\spark-3.1.2-bin-hadoop3.2\python\pyspark\sql\readwriter.py:803, in DataFrameReader.orc(self, path, mergeSchema, pathGlobFilter, recursiveFileLookup, modifiedBefore, modifiedAfter)
    801 if isinstance(path, str):
    802     path = [path]
--> 803 return self._df(self._jreader.orc(_to_seq(self._spark._sc, path)))

File C:\Spark\spark-3.1.2-bin-hadoop3.2\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py:1304, in JavaMember.__call__(self, *args)
   1298 command = proto.CALL_COMMAND_NAME  \
   1299     self.command_header  \
   1300     args_command  \
   1301     proto.END_COMMAND_PART
   1303 answer = self.gateway_client.send_command(command)
-> 1304 return_value = get_return_value(
   1305     answer, self.gateway_client, self.target_id, self.name)
   1307 for temp_arg in temp_args:
   1308     temp_arg._detach()

File C:\Spark\spark-3.1.2-bin-hadoop3.2\python\pyspark\sql\utils.py:111, in capture_sql_exception.<locals>.deco(*a, **kw)
    109 def deco(*a, **kw):
    110     try:
--> 111         return f(*a, **kw)
    112     except py4j.protocol.Py4JJavaError as e:
    113         converted = convert_exception(e.java_exception)

File C:\Spark\spark-3.1.2-bin-hadoop3.2\python\lib\py4j-0.10.9-src.zip\py4j\protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o30.orc.
: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
    at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
    at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:645)
    at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1230)
    at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1435)
    at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:493)
    at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1868)
    at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1910)
    at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:678)
    at org.apache.spark.util.HadoopFSUtils$.listLeafFiles(HadoopFSUtils.scala:225)
    at org.apache.spark.util.HadoopFSUtils$.$anonfun$parallelListLeafFilesInternal$1(HadoopFSUtils.scala:95)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at scala.collection.TraversableLike.map(TraversableLike.scala:238)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
    at scala.collection.AbstractTraversable.map(Traversable.scala:108)
    at org.apache.spark.util.HadoopFSUtils$.parallelListLeafFilesInternal(HadoopFSUtils.scala:85)
    at org.apache.spark.util.HadoopFSUtils$.parallelListLeafFiles(HadoopFSUtils.scala:69)
    at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$.bulkListLeafFiles(InMemoryFileIndex.scala:158)
    at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.listLeafFiles(InMemoryFileIndex.scala:131)
    at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.refresh0(InMemoryFileIndex.scala:94)
    at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>(InMemoryFileIndex.scala:66)
    at org.apache.spark.sql.execution.datasources.DataSource.createInMemoryFileIndex(DataSource.scala:581)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:417)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:325)
    at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:307)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:307)
    at org.apache.spark.sql.DataFrameReader.orc(DataFrameReader.scala:872)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

Spark version: 3.1.2; Scala version: 2.12.10; Java version: "1.8.0_131"

Any help is appreciated.

CodePudding user response:

I finally resolved this issue. I am running spark without installing Hadoop. So if we don't install Hadoop then we have to put "winutils.exe" to the C:\hadoop\bin folder. But in my case I had to put hadoop.dll along with winutils.exe and my problem was resolved. You can download both files form here according to the spark version you use. https://github.com/cdarlint/winutils

  • Related