I was messinag around with some pyspark code and noticed behavior which I don't understand or would expect. I made some code which generates random data, binary numbers, in another column and do a filter on the random generated data. However, when I execute .show() several times, I often see data which doesn't fulfill my condition. It seems that my random generated data function is getting called two times; once before the filter and once after the filter. I tried using .explain() but I don't understand how to read or interpret it. I'm also confused as to which order I'm supposed to read the output of .explain()
Can somebody help me understand, with .explain(), the simple code I made which is working very differently than I expected?
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col, udf
import random
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
_random_udf = udf(lambda x: int(random.randint(0, 1)), IntegerType())
inputDf = spark.createDataFrame([{'row': i} for i in range(10)])
random_result = inputDf.withColumn("status", _random_udf(col("row")))
non_zero_filter = random_result.filter(col('status') != 0)
non_zero_filter.show()
example of output using .show(). I was not expecting to see rows with 0s
in the status
column.
row | status |
---|---|
2 | 1 |
3 | 0 |
5 | 0 |
6 | 1 |
7 | 0 |
The output of .explain() is the following
non_zero_filter.explain()
== Physical Plan ==
== Physical Plan ==
*(3) Project [row#296L, pythonUDF0#314 AS status#299]
- BatchEvalPython [<lambda>(row#296L)], [pythonUDF0#314]
- *(2) Project [row#296L]
- *(2) Filter NOT (pythonUDF0#313 = 0)
- BatchEvalPython [<lambda>(row#296L)], [pythonUDF0#313]
- *(1) Scan ExistingRDD[row#296L]
CodePudding user response:
The change in the result is because you are generating random integers using Return random.randint
module. Every run result into a new random_result
dataframe.
Let's look at the plan. You have printed .explain()
which means you did not view the extended plan. You only asked for the physical plan
.Printing .explain(True)
. will give you the detailed plan.
The physical plan specifies how sparks logical plan will execute on the cluster. Beneath a df, there is an RDD. Simply put pyspark code compiles down to an RDD.
So in this case, to filter out rows that you want, spark scans the RDD. It then applies the filter and projects. So, there are three jobs in this case. Thats indicated by the number in brackets prefixed by a star. You can view details of each job by looking at the spark UI.
Forfilter
behavior refer to @Emma comment below