I created an AWS EMR Spark cluster with Release label:emr-6.2.0
Hadoop distribution:Amazon
Applications:Spark 3.0.1, Zeppelin 0.9.0
and copied all my local files(.jars, .py, .csv and sas7bdat) to the cluster master
When I do
[hadoop@ip-172-31-22-207 ~]$ ls -al /home/hadoop/sas_data1/
total 1071812
rwxrwxr-x 2 hadoop hadoop 66 Sep 13 04:08 .
drwxr-xr-x 7 hadoop hadoop 4096 Sep 13 04:38 ..
-rw-r--r-- 1 hadoop hadoop 471990272 Sep 13 04:07 file1.sas7bdat
-rw-r--r-- 1 hadoop hadoop 625541120 Sep 13 04:08 file2.sas7bdat
the output shows that the file is present. Also, in my program which is in /home/hadoop,
def process_raw_data(inputs, output):
spark = SparkSession.builder.\
config("spark.jars.packages", "saurfang:spark-sas7bdat:3.0.0-s_2.12").\
enableHiveSupport().getOrCreate()
sas_dir = f'{os.getcwd()}/sas_data1'
for filename in os.listdir(f"{sas_dir}"):
extension = os.path.splitext(filename)[1]
print("!!!!!!!!!!",f'{sas_dir}/{filename}')
df_spark = spark.read.format('com.github.saurfang.sas.spark').\
load(f'{sas_dir}/{filename}')
raw_df = df_spark.select('field1','field2')
raw_df.write.mode('append').parquet(output '/raw_data_output')
I am iterating through the files in the sas_data1 directory and in the output it shows the file name correctly as !!!!!!!!!! /home/hadoop/sas_data1/file1.sas7bdat
which is possible only if the file exists. But I am getting error that file does not exist . I ran the following command;
spark-submit --jars parso-2.0.11.jar,spark-sas7bdat-3.0.0-s_2.12.jar,hadoop-aws-2.7.4.jar,aws-java-sdk-1.7.4.jar --master yarn process_raw_files.py
File "/home/hadoop/process_raw_files.py", line 112, in <module>
output_bucket % 'raw_immigration_output')
File "/home/hadoop/process_raw_files.py", line 21, in process_raw_immigration
load(f'{sas_dir}/{filename}')
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 178, in load
File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 128, in deco
File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o67.load.
: java.io.FileNotFoundException: File does not exist: /home/hadoop/sas_data1/i94_apr16_sub.sas7bdat
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:86)
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:76)
at org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.getBlockLocations(FSDirStatAndListingOp.java:158)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1962)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:755)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:439)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:528)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1070)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:999)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:927)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2915)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:121)
at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:88)
at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:866)
at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:853)
at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:842)
at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1010)
at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:319)
at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:315)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:327)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:906)
at com.github.saurfang.sas.spark.SasRelation.inferSchema(SasRelation.scala:181)
at com.github.saurfang.sas.spark.SasRelation.<init>(SasRelation.scala:73)
at com.github.saurfang.sas.spark.SasRelation$.apply(SasRelation.scala:45)
at com.github.saurfang.sas.spark.DefaultSource.createRelation(DefaultSource.scala:209)
at com.github.saurfang.sas.spark.DefaultSource.createRelation(DefaultSource.scala:42)
at com.github.saurfang.sas.spark.DefaultSource.createRelation(DefaultSource.scala:27)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:344)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:297)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:286)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:286)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:232)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /home/hadoop/sas_data1/i94_apr16_sub.sas7bdat
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:86)
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:76)
at org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.getBlockLocations(FSDirStatAndListingOp.java:158)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1962)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:755)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:439)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:528)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1070)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:999)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:927)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2915)
at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1545)
at org.apache.hadoop.ipc.Client.call(Client.java:1491)
at org.apache.hadoop.ipc.Client.call(Client.java:1388)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:118)
at com.sun.proxy.$Proxy17.getBlockLocations(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:324)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
at com.sun.proxy.$Proxy18.getBlockLocations(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:864)
... 31 more
I stored the files in the normal storage of the EMR Cluster master. Why would the debugging show the file name but the error mentions file is not present? Is it something to do with the worker nodes where I haven't copied the files to?
CodePudding user response:
Spark will look for the files on HDFS. Copy the file to HDFS and rerun the job.