Home > database >  Error in filtering for rows with array size = 1 pyspark
Error in filtering for rows with array size = 1 pyspark

Time:11-19

previously, the dataframe was like that

 ---------- -------------------- 
|     appId|                lang|
 ---------- -------------------- 
|1000098520|              ["EN"]|
|1001449696|              ["EN"]|
|1001780528|["AR","ZH","CS","...|
|1001892954|              ["EN"]|
|1001892954|              ["EN"]|
|1001976488|["EN","FR","DE","...|
|1002028916|              ["EN"]|
|1002908393|              ["EN"]|
|1003066972|["EN","FR","DE","...|
|1004217104|              ["EN"]|
|1004552566|              ["EN"]|
|1005192468|              ["EN"]|
|1005488142|["EN","JA","KO","...|
root
 |-- appId: string (nullable = true)
 |-- lang: string (nullable = true)

I tried to use json.loads() to convert to array with strings. But I think it somehow does not conform to the json.. how can i convert it to array of strings?

 ---------- -------------------- -------- 
|     appId|                lang|len_lang|
 ---------- -------------------- -------- 
|1000098520|                [EN]|       1|
|1001449696|                [EN]|       1|
|1001780528|[AR, ZH, CS, NL, ...|      25|
|1001892954|                [EN]|       1|
|1001892954|                [EN]|       1|
|1001976488|    [EN, FR, DE, ES]|       4|
|1002028916|                [EN]|       1|
|1002908393|                [EN]|       1|

I have this dataframe. The lang column was previously of string type, but i converted it to array type using udf json.loads(). I then want to filter for appids with only 'EN' as the language i.e. array size == 1 and only contains 'EN'.

I tried to do a where() statement with F.size(F.col('lang'))==1 & F.array_contains(F.col('lang','EN'))... but I get this error

21/11/19 10:07:32 ERROR PythonUDFRunner: Python worker exited unexpectedly (crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/../server/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 362, in main
    eval_type = read_int(infile)
  File "/../spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 724, in read_int
    raise EOFError
EOFError

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:260)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:252)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:411)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:417)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/backupcomputer/server/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/Users/backupcomputer/server/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/Users/backupcomputer/server/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 352, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/Users/backupcomputer/server/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 142, in dump_stream
    for obj in iterator:
  File "/Users/backupcomputer/server/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 341, in _batched
    for item in iterator:
  File "<string>", line 1, in <lambda>
  File "/Users/backupcomputer/server/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 85, in <lambda>
    return lambda *a: f(*a)
  File "/Users/backupcomputer/server/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "/var/folders/w1/16mcxl3d3zg1831vc7k73fnh0000gn/T/ipykernel_49586/324191451.py", line 5, in <lambda>
  File "/var/folders/w1/16mcxl3d3zg1831vc7k73fnh0000gn/T/ipykernel_49586/324191451.py", line 2, in parse_array_from_string
  File "/../.pyenv/versions/3.7.9/lib/python3.7/json/__init__.py", line 348, in loads
    return _default_decoder.decode(s)
  File "/../.pyenv/versions/3.7.9/lib/python3.7/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/../.pyenv/versions/3.7.9/lib/python3.7/json/decoder.py", line 355, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)```

CodePudding user response:

Use the from_json function like the following example.

import pyspark.sql.functions as F

.....
df = df.withColumn('lang', F.expr('from_json(lang,"array<string>")')).select('*', F.size('lang').alias('len_lang'))
df.printSchema()
df.show(truncate=False)
  • Related