previously, the dataframe was like that
---------- --------------------
| appId| lang|
---------- --------------------
|1000098520| ["EN"]|
|1001449696| ["EN"]|
|1001780528|["AR","ZH","CS","...|
|1001892954| ["EN"]|
|1001892954| ["EN"]|
|1001976488|["EN","FR","DE","...|
|1002028916| ["EN"]|
|1002908393| ["EN"]|
|1003066972|["EN","FR","DE","...|
|1004217104| ["EN"]|
|1004552566| ["EN"]|
|1005192468| ["EN"]|
|1005488142|["EN","JA","KO","...|
root
|-- appId: string (nullable = true)
|-- lang: string (nullable = true)
I tried to use json.loads() to convert to array with strings. But I think it somehow does not conform to the json.. how can i convert it to array of strings?
---------- -------------------- --------
| appId| lang|len_lang|
---------- -------------------- --------
|1000098520| [EN]| 1|
|1001449696| [EN]| 1|
|1001780528|[AR, ZH, CS, NL, ...| 25|
|1001892954| [EN]| 1|
|1001892954| [EN]| 1|
|1001976488| [EN, FR, DE, ES]| 4|
|1002028916| [EN]| 1|
|1002908393| [EN]| 1|
I have this dataframe. The lang
column was previously of string type, but i converted it to array type using udf json.loads(). I then want to filter for appids with only 'EN' as the language i.e. array size == 1 and only contains 'EN'.
I tried to do a where() statement with F.size(F.col('lang'))==1 & F.array_contains(F.col('lang','EN'))
... but I get this error
21/11/19 10:07:32 ERROR PythonUDFRunner: Python worker exited unexpectedly (crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/../server/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 362, in main
eval_type = read_int(infile)
File "/../spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 724, in read_int
raise EOFError
EOFError
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:260)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:411)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:417)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/Users/backupcomputer/server/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
process()
File "/Users/backupcomputer/server/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/Users/backupcomputer/server/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 352, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/Users/backupcomputer/server/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 142, in dump_stream
for obj in iterator:
File "/Users/backupcomputer/server/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 341, in _batched
for item in iterator:
File "<string>", line 1, in <lambda>
File "/Users/backupcomputer/server/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 85, in <lambda>
return lambda *a: f(*a)
File "/Users/backupcomputer/server/spark-2.4.8-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
return f(*args, **kwargs)
File "/var/folders/w1/16mcxl3d3zg1831vc7k73fnh0000gn/T/ipykernel_49586/324191451.py", line 5, in <lambda>
File "/var/folders/w1/16mcxl3d3zg1831vc7k73fnh0000gn/T/ipykernel_49586/324191451.py", line 2, in parse_array_from_string
File "/../.pyenv/versions/3.7.9/lib/python3.7/json/__init__.py", line 348, in loads
return _default_decoder.decode(s)
File "/../.pyenv/versions/3.7.9/lib/python3.7/json/decoder.py", line 337, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/../.pyenv/versions/3.7.9/lib/python3.7/json/decoder.py", line 355, in raw_decode
raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)```
CodePudding user response:
Use the from_json
function like the following example.
import pyspark.sql.functions as F
.....
df = df.withColumn('lang', F.expr('from_json(lang,"array<string>")')).select('*', F.size('lang').alias('len_lang'))
df.printSchema()
df.show(truncate=False)