Currently, I am working on a single node Hadoop and I wrote a job to output a sorted dataframe with only one partition to one single csv file. And I discovered several outcomes when using repartition differently.
At first, I used orderBy
to sort the data and then used repartition
to output a CSV file, but the output was sorted in chunks instead of in an overall manner.
Then, I tried to discard repartition
function, but the output was only a part of the records. I realized without using repartition
spark will output 200 CSV files instead of 1, even though I am working on a one partition dataframe.
Thus, what I did next were placing repartition(1)
, repartition(1, "column of partition")
, repartition(20)
function before orderBy
. Yet output remained the same with 200 CSV files.
So I used the coalesce(1)
function before orderBy
, and the problem was fixed.
I do not understand why working on a single partitioned dataframe has to use repartition
and coalesce
, and how the aforesaid processes affect the output. Grateful if someone can elaborate a little.
CodePudding user response:
Spark has relevant parameters here:
spark.sql.shuffle.partitions
and spark.default.parallelism
.
When you perform operations like sort in your case, it triggers something called a shuffle operation
https://spark.apache.org/docs/latest/rdd-programming-guide.html#shuffle-operations
That will split your dataframe to spark.sql.shuffle.partitions
partitions.
I also struggled with the same problem as you do and did not find any elegant solution.
CodePudding user response:
Spark generally doesn’t have a great concept of ordered data, because all your data is split accross multiple partitions. And every time you call an operation that requires a shuffle your ordering will be changed.
For this reason, you’re better off only sorting your data in spark for the operations that really need it.
Forcing your data into a single file will break when the dataset gets larger
As Miroslav points out your data gets shuffled between partitions every time you trigger what’s called a shuffle stage (this is things like grouping or join or window operations) You can set the number of shuffle partitions in the spark Config - the default is 200
Calling repartition before a group by operation is kind of pointless because spark needs to reparation your data again to execute the groupby
Coalesce operations sometimes get pushed into the shuffle stage by spark. So maybe that’s why it worked. Either that or because you called it after the groupby operation
A good way to understand what’s going on with your query is to start using the spark UI https://spark.apache.org/docs/3.0.0-preview/web-ui.html