Home > Software design >  How does spark calculate the number of reducers in a hash shuffle?
How does spark calculate the number of reducers in a hash shuffle?

Time:12-24

I am trying to understand hash shuffle in Spark. I am reading this article

Hash Shuffle: Each mapper task creates separate file for each separate reducer, resulting in M * R total files on the cluster, where M is the number of “mappers” and R is the number of “reducers”. With high amount of mappers and reducers this causes big problems, both with the output buffer size, amount of open files on the filesystem, speed of creating and dropping all these files. The logic of this shuffler is pretty dumb: it calculates the amount of “reducers” as the amount of partitions on the “reduce” side

Can you help me understand the emboldened part? How does it know the amount of partitions on the reduce side or, what does "amount of partitions on the reduce side" even mean? Is it equal to spark.sql.shuffle.partitions? If it is indeed equal to that, then what is even there to calculate? A very small example would be very helpful.

CodePudding user response:

spark.sql.shuffle.partitions is just the default used when number of partitions for a shuffle isn't set explicitly. So the "calculation", at a minimum, would involve a check whether specific number of partitions was requested or should Spark use the default.

Quick example:

scala> df.repartition(400,col("key")).groupBy("key").avg("value").explain()
== Physical Plan ==
*(2) HashAggregate(keys=[key#178], functions=[avg(value#164)])
 - *(2) HashAggregate(keys=[key#178], functions=[partial_avg(value#164)])
    - Exchange hashpartitioning(key#178, 400)  <<<<< shuffle across increased number of partitions
       - *(1) Project [key#178, value#164]
          - *(1) FileScan parquet adb.atable[value#164,key#178,othercolumns...] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[hdfs://ns1/hive/adb/atable/key=123..., PartitionCount: 3393, PartitionFilters: [isnotnull(key#178), (cast(key#178 as string) > 100)], PushedFilters: [], ReadSchema: struct<value:double,othercolumns...>

scala> 

In Spark 3 and up, Adaptive Query Engine could also interject and revise that number, in attempts to optimize execution by coalescing, preserving (e.g. ENSURE_REQUIREMENTS) or increasing partitions.

EDIT: A side note -- your article is quite old (2015 was ages ago :)) and talks about pre-SparkSQL/pre-dataframe times. I'd try to find something more relevant.

EDIT 2: ...But even there, in the comments section, author rightfully says: In fact, here the question is more general. For most of the transformations in Spark you can manually specify the desired amount of output partitions, and this would be your amount of “reducers”...

  • Related