Home > Software engineering >  How to interpret pyspark .explain() and how does pyspark order operations
How to interpret pyspark .explain() and how does pyspark order operations

Time:11-23

I was messinag around with some pyspark code and noticed behavior which I don't understand or would expect. I made some code which generates random data, binary numbers, in another column and do a filter on the random generated data. However, when I execute .show() several times, I often see data which doesn't fulfill my condition. It seems that my random generated data function is getting called two times; once before the filter and once after the filter. I tried using .explain() but I don't understand how to read or interpret it. I'm also confused as to which order I'm supposed to read the output of .explain()

Can somebody help me understand, with .explain(), the simple code I made which is working very differently than I expected?

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col, udf
import random
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()


_random_udf = udf(lambda x: int(random.randint(0, 1)), IntegerType())

inputDf = spark.createDataFrame([{'row': i} for i in range(10)])
random_result = inputDf.withColumn("status", _random_udf(col("row")))
non_zero_filter = random_result.filter(col('status') != 0)

non_zero_filter.show()

example of output using .show(). I was not expecting to see rows with 0s in the status column.

row status
2 1
3 0
5 0
6 1
7 0

The output of .explain() is the following

non_zero_filter.explain()

== Physical Plan ==
== Physical Plan ==
*(3) Project [row#296L, pythonUDF0#314 AS status#299]
 - BatchEvalPython [<lambda>(row#296L)], [pythonUDF0#314]
    - *(2) Project [row#296L]
       - *(2) Filter NOT (pythonUDF0#313 = 0)
          - BatchEvalPython [<lambda>(row#296L)], [pythonUDF0#313]
             - *(1) Scan ExistingRDD[row#296L]

CodePudding user response:

The change in the result is because you are generating random integers using Return random.randint module. Every run result into a new random_result dataframe.

Let's look at the plan. You have printed .explain() which means you did not view the extended plan. You only asked for the physical plan.Printing .explain(True). will give you the detailed plan.

The physical plan specifies how sparks logical plan will execute on the cluster. Beneath a df, there is an RDD. Simply put pyspark code compiles down to an RDD.

So in this case, to filter out rows that you want, spark scans the RDD. It then applies the filter and projects. So, there are three jobs in this case. Thats indicated by the number in brackets prefixed by a star. You can view details of each job by looking at the spark UI.

Forfilter behavior refer to @Emma comment below

  • Related