Recently I was asked in an interview about the algorithm of spark df.show() function. how will spark decide from which executor/executors the records will be fetched?
CodePudding user response:
A shitty question as not what u would use in prod.
It is a smart action that looks at what you have in terms of transformations.
Show() is in fact show(20). If just show it looks at 1st and consecutive partitions to get 20 rows. Order by also optimized. A count does need complete processing.
Many google posts btw.
CodePudding user response:
It's simple.
In Spark 2 , show()
calls showString()
to format the data as a string and then prints it. showString()
calls getRows()
to get the top rows of the dataset as a collection of strings. getRows()
calls take()
to take the original rows and transforms them into strings. take()
simply wraps head()
. head()
calls limit()
to build a limit query and executes it. limit()
adds a Limit(n)
node at the front of the logical plan which is really a GlobalLimit(n, LocalLimit(n))
. Both GlobalLimit
and LocalLimit
are subclasses of OrderPreservingUnaryNode
that override its maxRows
(in GlobalLimit
) or maxRowsPerPartition
(in LocalLimit
) methods. The logical plan now looks like:
GlobalLimit n
- LocalLimit n
- ...
This goes through analysis and optimisation by Catalyst where limits are removed if something down the tree produces less rows than the limit and ends up as CollectLimitExec(m)
(where m
<= n
) in the execution strategy, so the physical plan looks like:
CollectLimit m
- ...
CollectLimitExec
executes its child plan, then checks how many partitions the RDD has. If none, it returns an empty dataset. If one, it runs mapPartitionsInternal(_.take(m))
to take the first m
elements. If more than one, it applies take(m)
on each partition in the RDD using mapPartitionsInternal(_.take(m))
, builds a shuffle RDD that collects the results in a single partition, then again applies take(m)
.
In other words, it depends (because optimisation phase), but in the general case it takes the top rows of the concatenation of the top rows of each partition and so involves all executors holding a part of the dataset.
OK, perhaps not so simple.
CodePudding user response:
Without undermining @thebluephantom's and @Hristo Iliev's answers (each give some insight into what's happening under the hood), I also wanted to add my answer to this list.
I came to the same conclusion(s), albeit by observing the behaviour of the underlying partitions.
Partitions have an index associated with them. This is seen in the code below.
(Taken from original spark source code here).
trait Partition extends Serializable {
def index: Int
:
So amongst partitions, there is an order.
And as already mentioned in other answers, the df.show()
is the same as df.show(20)
or the top 20 rows. So the underlying partition indexes determine which partition (and hence executor) those rows come from.
The partition indexes are assigned at the time of read, or (re-assigned) during a shuffle.
Here is some code to see this behaviour:
val df = Seq((5,5), (6,6), (7,7), (8,8), (1,1), (2,2), (3,3), (4,4)).toDF("col1", "col2")
// above sequence is defined out of order - to make behaviour visible
// see partition structure
df.rdd.glom().collect()
/* Array(Array([5,5]), Array([6,6]), Array([7,7]), Array([8,8]), Array([1,1]), Array([2,2]), Array([3,3]), Array([4,4])) */
df.show(4, false)
/*
---- ----
|col1|col2|
---- ----
|5 |5 |
|6 |6 |
|7 |7 |
|8 |8 |
---- ----
only showing top 4 rows
*/
In the above code, we see 8 partitions (each inner-Array is a partition) - this is because spark defaults to 8 partitions when we create a dataframe.
Now let's repartition the dataframe.
// Now let's repartition df
val df2 = df.repartition(2)
// lets see the partition structure
df2.rdd.glom().collect()
/* Array(Array([5,5], [6,6], [7,7], [8,8], [1,1], [2,2], [3,3], [4,4]), Array()) */
// lets see output
df2.show(4,false)
/*
---- ----
|col1|col2|
---- ----
|5 |5 |
|6 |6 |
|7 |7 |
|8 |8 |
---- ----
only showing top 4 rows
*/
In the above code, the top 4 rows came from the first partition (which actually has all elements of the original data in it). Also note the skew in partition sizes, since no partitioning column was mentioned.
Now lets try and create 3 partitions
val df3 = df.repartition(3)
// lets see partition structures
df3.rdd.glom().collect()
/*
Array(Array([8,8], [1,1], [2,2]), Array([5,5], [6,6]), Array([7,7], [3,3], [4,4]))
*/
// And lets see the top 4 rows this time
df3.show(4, false)
/*
---- ----
|col1|col2|
---- ----
|8 |8 |
|1 |1 |
|2 |2 |
|5 |5 |
---- ----
only showing top 4 rows
*/
From the above code, we observe that Spark went to the first partition and tried to get 4 rows. Since only 3 were available, it grabbed those. Then moved on to the next partition, and got one more row. Thus, the order that you see from show(4, false)
, is due to the underlying data partitioning and the index ordering amongst partitions.
This example uses show(4)
, but this behaviour can be extended to show()
or show(20)
.