I have setup a Spark cluster version 3.1.2. I am using Python API for Spark. I have some JSON data that I have loaded in dataframe. I have to parse a nested column (ADSZ_2) that looks like following format
ADSZ_2: [{key,value}, {key,value}, {key,value}]
I have developed following code for this purpose
...
...
def parseCell(array_data):
final_list = []
if array_data is not None:
for record in array_data:
record_dict = record.asDict()
if "string1" in record_dict:
string1 = remover(record_dict["string1"])
record_dict["string1"] = string1
if "string2" in record_dict:
string2 = remover(record_dict["string2"])
record_dict["string2"] = string2
final_list.append(Row(**record_dict))
return final_list
df = spark.read.load(data_path, multiline="false", format="json")
udf_fun = udf(lambda row: parseCell(row), ArrayType(StructType()))
df.withColumn("new-name", udf_fun(col("ADSZ_2"))).show()
...
When I run above code, I got following exception
21/10/07 09:09:07 ERROR Executor: Exception in task 0.0 in stage 116.0 (TID 132)
net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for pyspark.sql.types._create_row)
at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:773)
at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:213)
at net.razorvine.pickle.Unpickler.load(Unpickler.java:123)
at net.razorvine.pickle.Unpickler.loads(Unpickler.java:136)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec.$anonfun$evaluate$6(BatchEvalPythonExec.scala:94)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
21/10/07 09:09:07 WARN TaskSetManager: Lost task 0.0 in stage 116.0 (TID 132) (hadoop-master.local executor driver): net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for pyspark.sql.types._create_row)
at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:773)
at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:213)
at net.razorvine.pickle.Unpickler.load(Unpickler.java:123)
at net.razorvine.pickle.Unpickler.loads(Unpickler.java:136)
at org.apache.spark.sql.execution.python.BatchEvalPythonExec.$anonfun$evaluate$6(BatchEvalPythonExec.scala:94)
I have tried various options as given at 1 but None of these solutions works. Where is the problem ?
Is there any better way to do this job ?
CodePudding user response:
I will propose an alternative solution where you transform your rows with the rdd of the dataframe. Here is a self-contained example that I have tried to adopt to your data:
import pyspark.sql.functions as F
from pyspark.sql import Row
import pyspark.sql.types as T
df = spark.createDataFrame([Row(ADSZ_2=[{"string1": "a", "string2": "b"}, {"string1": "c", "string2": "d"}]),
Row(ADSZ_2=[{"string1": "e", "string2": "f"}, {"string1": "g", "not_taken" : "1", "string2": "h"}]),
Row(ADSZ_2=[{"string1": "i", "string2": "j"}, {"string1": "k", "not_taken" : "1", "string2": "l"}]),
Row(ADSZ_2=None),
Row(ADSZ_2=[None, {"string1": "m", "not_taken" : "1", "string2": "n"}])])
df.show(20, False)
def parseCell(row):
final_list = []
l = row["ADSZ_2"]
if l:
for record_dict in l:
if record_dict:
new_dict = {key : val for key,val in record_dict.items() if key in ["string1", "string2"]}
if new_dict:
final_list.append(Row(**new_dict))
return final_list
df_rdd = df.rdd.flatMap(lambda row: parseCell(row))
new_df = spark.createDataFrame(df_rdd)
new_df.show()
output:
----------------------------------------------------------------------------
|ADSZ_2 |
----------------------------------------------------------------------------
|[{string1 -> a, string2 -> b}, {string1 -> c, string2 -> d}] |
|[{string1 -> e, string2 -> f}, {not_taken -> 1, string1 -> g, string2 -> h}]|
|[{string1 -> i, string2 -> j}, {not_taken -> 1, string1 -> k, string2 -> l}]|
|null |
|[null, {not_taken -> 1, string1 -> m, string2 -> n}] |
----------------------------------------------------------------------------
------- -------
|string1|string2|
------- -------
| a| b|
| c| d|
| e| f|
| g| h|
| i| j|
| k| l|
| m| n|
------- -------
You need to make sure that all the rows that you generate in parseCell
contains the correct number of columns.