So, I have data of 100k lines saved as csv file. Here is sample of it:
ID,Name,Surname,Birthdate,Details
0, Agjqyru, Qtltzu, 1923-02-23, "{City=Neftchala, Gender=male, Education=collage}"
1, Zkaczi, Gvuvwwle, 2002-02-28, "{City=Mingachevir, Gender=female, Education=doctor}"
2, Hkbfros, Llmufk, 1948-02-29, "{City=Ujar, Gender=male, Education=collage}"
3, Dddtulkeo, Fdnccbp, 1903-07-01, "{City=Dashkasan, Gender=female, Education=bachelor}"
4, Wssqm, Kzekihhqjmrd, 1935-05-10, "{City=Baku, Gender=female, Education=uneducated}"
5, Iurg, Nglzxwu, 1915-04-02, "{City=Khojavend, Gender=male, Education=school}"
My task is to divide details column to 3 separate columns and save data as parquet file. Here is my attempt so far:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import SQLContext
sparkContext = SparkContext.getOrCreate()
sqlContext = SQLContext(sparkContext).getOrCreate(sparkContext)
sparkSession = SparkSession.builder.master("local").appName("parquet_example").getOrCreate()
# read csv file
__DATA = spark.read.csv('data.csv', header = True)
# divide column details into 3
__DATA = __DATA.withColumn("Details", split(regexp_replace(regexp_replace((regexp_replace("Details",'\{|}',"")),'\:',','),'\"|"',""),','))\
.withColumn("City", element_at("Details",1))\
.withColumn("Gender", element_at("Details",2))\
.withColumn("Education",element_at("Details",3)).drop("Details").rdd
# clean data
__DATA = __DATA.map(lambda x : (x[0], x[1], x[2], x[3][x[3].index("=") 1:], x[4][x[4].index("=") 1:], x[5][x[5].index("=") 1:]))
dfSchema = StructType([
StructField('Name', StringType(), True),
StructField('Surname', StringType(), True),
StructField('Birthdate', StringType(), True),
StructField('City', StringType(), True),
StructField('Gender', StringType(), True),
StructField('Education', StringType(), True)
])
__DATA = __DATA.toDF(dfSchema)
However, I get Py4JJavaError
error whenever I try the following code to save it to parquet file:
__DATA.write.mode("overwrite").parquet("people.parquet") # write it to parquet
So, what am I doing wrong?
Here is full error stack:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-55-67be3fc6fc33> in <module>
----> 1 __DATA.write.mode("overwrite").parquet("people.parquet") # write it to parquet
C:\<user>\spark-3.0.0-bin-hadoop2.7\python\pyspark\sql\readwriter.py in parquet(self, path, mode, partitionBy, compression)
934 self.partitionBy(partitionBy)
935 self._set_opts(compression=compression)
--> 936 self._jwrite.parquet(path)
937
938 @since(1.6)
C:\<user>\spark-3.0.0-bin-hadoop2.7\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py in __call__(self, *args)
1302
1303 answer = self.gateway_client.send_command(command)
-> 1304 return_value = get_return_value(
1305 answer, self.gateway_client, self.target_id, self.name)
1306
C:\<user>\spark-3.0.0-bin-hadoop2.7\python\pyspark\sql\utils.py in deco(*a, **kw)
129 def deco(*a, **kw):
130 try:
--> 131 return f(*a, **kw)
132 except py4j.protocol.Py4JJavaError as e:
133 converted = convert_exception(e.java_exception)
C:\<user>\spark-3.0.0-bin-hadoop2.7\python\lib\py4j-0.10.9-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o1290.parquet.
: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:226)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:178)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:944)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:944)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:396)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:380)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:269)
at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:829)
at sun.reflect.GeneratedMethodAccessor131.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 35.0 failed 1 times, most recent failure: Lost task 1.0 in stage 35.0 (TID 60, bkhddev01, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "C:\<user>\spark-3.0.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 605, in main
File "C:\<user>\spark-3.0.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 597, in process
File "C:\<user>\spark-3.0.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 271, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "C:\<user>\spark-3.0.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\util.py", line 107, in wrapper
return f(*args, **kwargs)
File "<ipython-input-53-6c5f08998e2a>", line 12, in <lambda>
ValueError: substring not found
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:260)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:205)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2023)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1972)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1971)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1971)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:950)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:950)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:950)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2203)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2152)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2141)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:752)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2093)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:195)
... 32 more
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "C:\<user>\spark-3.0.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 605, in main
File "C:\<user>\spark-3.0.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 597, in process
File "C:\<user>\spark-3.0.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 271, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "C:\<user>\spark-3.0.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\util.py", line 107, in wrapper
return f(*args, **kwargs)
File "<ipython-input-53-6c5f08998e2a>", line 12, in <lambda>
ValueError: substring not found
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:260)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:205)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
CodePudding user response:
The error is because of below code.
x[3][x[3].index("=") 1:]
At index 3 you have Birthdate
field. You should start slicing operation after index 3 from where you will clean up fields under Details
column.
Instead of converting DF to RDD, you can do cleanup operation at DF level only.
__DATA = __DATA.withColumn("Details", split(regexp_replace("Details",'\{|}',""),','))\
.withColumn("City", split(col("Details")[0], '=')[1]) \
.withColumn("Gender", split(col("Details")[1], '=')[1])\
.withColumn("Education",split(col("Details")[2], '=')[1]).drop("Details")