I recently came across a strange performance issue with spark while local testing that turned out to be related to the number of shuffle partitions. I found this quip on the readme for "spark-fast-tests":
It's best set the number of shuffle partitions to a small number like one or four in your test suite. This configuration can make your tests run up to 70% faster. You can remove this configuration option or adjust it if you're working with big DataFrames in your test suite.
But, I'd like to know... WHY?
So much so, that I've gone through the trouble of reproducing the issue (obfuscating quite a lot of business domain case classes using this gist).
The below will run in ~10s on my mac locally using a fairly vanilla spark create:
lazy val spark: SparkSession =
SparkSession
.builder()
.appName("Test-Driver")
.master("local[2]")
.config("spark.sql.warehouse.dir", TempWarehouseDir.toString)
.getOrCreate()
When the shuffle setting is 1.
However! If I change the shuffle setting to something that a cluster might have, say 200 performance drops to near a minute:
spark.sqlContext.setConf("spark.sql.shuffle.partitions", "200")
Does anyone know what is going on here? Why would increasing the shuffle partitions cause the performance to drop so significantly locally?
Granted the domain classes are large, but I don't think that totally explains why the test behaves this way.
Here is the test code:
"list join df takes a long time" in {
spark.sqlContext.setConf("spark.sql.shuffle.partitions", "200")
val withList =
Seq(
("key1", Seq(MyBigDomainClass(None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None))),
)
.toDF("key1", "values").as[(String, List[MyBigDomainClass])]
val withoutList =
Seq(
("key1", 1),
("key2", 2)
).toDF("key1", "value").as[(String, Int)]
var start = System.currentTimeMillis()
val joined = withoutList.joinWith(withList, withList("key1") === withoutList("key1"), "full")
joined.show
println(s"[join] elapsed: ${(System.currentTimeMillis() - start) / 1000}s")
start = System.currentTimeMillis()
joined.map {
case (a, b) => (Option(a), Option(b).fold(List.empty[MyBigDomainClass])(_._2))
}.show
println(s"[map] elapsed: ${(System.currentTimeMillis() - start) / 1000}s")
}
And the domain classes:
package com.edmunds.bde.dataservices.imt.inventory.pipeline.job
case class TestClass_2(field_80:Option[String], field_81:Option[String], field_82:Option[Int])
case class TestClass_3(field_84:Option[Int], field_85:Option[Int], field_86:Option[Int])
case class TestClass_4(field_90:Option[String], field_91:Option[String], field_92:Option[String], field_93:Option[Double], field_94:Option[Double])
case class TestClass_5(field_96:Option[String], field_97:Option[String], field_98:Option[String], field_99:Option[Double], field_100:Option[String], field_101:Option[Int], field_102:Option[String], field_103:Option[String], field_104:Option[String], field_105:Option[Int], field_106:Option[Int], field_107:Option[Int], field_108:Option[Int])
case class TestClass_6(field_111:Option[String], field_112:Option[String], field_113:Option[String], field_114:Option[String], field_115:Option[String], field_116:Option[String], field_117:Option[String], field_118:Option[String], field_119:Option[String])
case class TestClass_7(field_121:Option[String], field_122:Option[String], field_123:Option[String], field_124:Option[String], field_125:Option[String], field_126:Option[String], field_127:Option[String], field_128:Option[String], field_129:Option[String])
case class TestClass_8(field_131:Option[String], field_132:Option[String], field_133:Option[String], field_134:Option[String], field_135:Option[String], field_136:Option[String], field_137:Option[String], field_138:Option[String], field_139:Option[String])
case class TestClass_9(field_141:Option[Long], field_142:Option[String], field_143:Option[String], field_144:Option[String], field_145:Option[Long], field_146:Option[Long])
case class TestClass_10(field_150:Option[Long], field_151:Option[String], field_152:Option[String], field_153:Option[String], field_154:Option[Seq[String]])
case class TestClass_1(field_70:Option[Long], field_71:Option[String], field_72:Option[String], field_73:Option[Long], field_74:Option[String], field_75:Option[String], field_76:Option[String], field_77:Option[String], field_78:Option[String], field_82:Option[TestClass_2], field_86:Option[TestClass_3], field_87:Option[Double], field_88:Option[Double], field_94:Option[Seq[TestClass_4]], field_108:Option[TestClass_5], field_109:Option[String], field_119:Option[TestClass_6], field_129:Option[TestClass_7], field_139:Option[TestClass_8], field_146:Option[Seq[TestClass_9]], field_147:Option[Seq[String]], field_148:Option[Seq[String]], field_154:Option[Seq[TestClass_10]])
case class TestClass_12(field_157:Option[Double], field_158:Option[Double], field_159:Option[Double], field_160:Option[Double], field_161:Option[Double], field_162:Option[java.math.BigDecimal], field_163:Option[java.math.BigDecimal], field_164:Option[Double], field_165:Option[Double])
case class TestClass_11(field_165:Option[TestClass_12], field_166:Option[Long], field_167:Option[scala.collection.Map[String, String]])
case class TestClass_14(field_170:Option[Double], field_171:Option[Double], field_172:Option[String])
case class TestClass_15(field_174:Option[Double], field_175:Option[Double], field_176:Option[Double], field_177:Option[Double], field_178:Option[Double], field_179:Option[Double], field_180:Option[Double], field_181:Option[Double], field_182:Option[Double], field_183:Option[Double], field_184:Option[Double], field_185:Option[Double], field_186:Option[Double], field_187:Option[Int], field_188:Option[Long], field_189:Option[Long], field_190:Option[Long], field_191:Option[Long])
case class TestClass_16(field_193:Option[Double], field_194:Option[Double], field_195:Option[Double], field_196:Option[Double], field_197:Option[Double], field_198:Option[Double])
case class TestClass_17(field_200:Option[java.math.BigDecimal], field_201:Option[Double], field_202:Option[java.math.BigDecimal], field_203:Option[Int])
case class TestClass_19(field_211:Option[Int], field_212:Option[String], field_213:Option[Double], field_214:Option[Int], field_215:Option[Double], field_216:Option[Int], field_217:Option[Double], field_218:Option[Int], field_219:Option[Int], field_220:Option[Int], field_221:Option[Int], field_222:Option[String], field_223:Option[java.sql.Date], field_224:Option[Int], field_225:Option[Int], field_226:Option[Int], field_227:Option[Int], field_228:Option[String])
case class TestClass_18(field_205:Option[Double], field_206:Option[Double], field_207:Option[Double], field_208:Option[Double], field_209:Option[String], field_228:Option[TestClass_19])
case class TestClass_20(field_230:Option[java.sql.Timestamp], field_231:Option[Long], field_232:Option[String], field_233:Option[String], field_234:Option[String], field_235:Option[java.sql.Timestamp], field_236:Option[java.sql.Timestamp], field_237:Option[Double], field_238:Option[Int], field_239:Option[Int], field_240:Option[Boolean], field_241:Option[Int], field_242:Option[Int], field_243:Option[Double], field_244:Option[Long], field_245:Option[String], field_246:Option[java.sql.Timestamp], field_247:Option[String])
case class TestClass_21(field_249:Option[java.sql.Timestamp], field_250:Option[Long], field_251:Option[String], field_252:Option[String], field_253:Option[String], field_254:Option[java.sql.Timestamp], field_255:Option[java.sql.Timestamp], field_256:Option[Double], field_257:Option[Int], field_258:Option[Int], field_259:Option[Boolean], field_260:Option[Int], field_261:Option[Int], field_262:Option[Double], field_263:Option[Long], field_264:Option[String], field_265:Option[java.sql.Timestamp], field_266:Option[String])
case class TestClass_13(field_172:Option[TestClass_14], field_191:Option[TestClass_15], field_198:Option[TestClass_16], field_203:Option[TestClass_17], field_228:Option[TestClass_18], field_247:Option[Seq[TestClass_20]], field_266:Option[Seq[TestClass_21]], field_267:Option[java.math.BigDecimal])
case class TestClass_22(field_269:Option[String], field_270:Option[String], field_271:Option[String], field_272:Option[String], field_273:Option[Double], field_274:Option[String])
case class TestClass_23(field_277:Option[Int], field_278:Option[Boolean], field_279:Option[Int], field_280:Option[Boolean], field_281:Option[Boolean], field_282:Option[Boolean], field_283:Option[Boolean], field_284:Option[Boolean], field_285:Option[Boolean], field_286:Option[String], field_287:Option[String], field_288:Option[String], field_289:Option[Boolean], field_290:Option[Boolean])
case class TestClass_25(field_293:Option[Boolean], field_294:Option[Boolean], field_295:Option[String], field_296:Option[String])
case class TestClass_26(field_298:Option[Boolean], field_299:Option[Boolean], field_300:Option[String], field_301:Option[String])
case class TestClass_27(field_303:Option[Boolean], field_304:Option[Boolean], field_305:Option[String], field_306:Option[String])
case class TestClass_24(field_296:Option[TestClass_25], field_301:Option[TestClass_26], field_306:Option[TestClass_27])
case class TestClass_28(field_311:Option[Long], field_312:Option[Long], field_313:Option[Boolean], field_314:Option[Int], field_315:Option[String], field_316:Option[String], field_317:Option[Boolean], field_318:Option[Boolean], field_319:Option[Boolean])
case class MyBigDomainClass(field_1:Option[String], field_2:Option[String], field_3:Option[String], field_4:Option[String], field_5:Option[java.sql.Timestamp], field_6:Option[java.sql.Date], field_7:Option[String], field_8:Option[String], field_9:Option[String], field_10:Option[String], field_11:Option[Int], field_12:Option[String], field_13:Option[String], field_14:Option[String], field_15:Option[String], field_16:Option[String], field_17:Option[String], field_18:Option[Double], field_19:Option[Double], field_20:Option[Double], field_21:Option[Double], field_22:Option[Double], field_23:Option[Double], field_24:Option[Double], field_25:Option[Double], field_26:Option[Double], field_27:Option[Double], field_28:Option[Double], field_29:Option[Double], field_30:Option[String], field_31:Option[String], field_32:Option[String], field_33:Option[String], field_34:Option[String], field_35:Option[String], field_36:Option[String], field_37:Option[String], field_38:Option[String], field_39:Option[String], field_40:Option[String], field_41:Option[String], field_42:Option[String], field_43:Option[String], field_44:Option[String], field_45:Option[String], field_46:Option[String], field_47:Option[Int], field_48:Option[Int], field_49:Option[java.sql.Date], field_50:Option[java.sql.Date], field_51:Option[java.sql.Date], field_52:Option[java.sql.Date], field_53:Option[String], field_54:Option[String], field_55:Option[Int], field_56:Option[java.sql.Date], field_57:Option[String], field_58:Option[String], field_59:Option[String], field_60:Option[String], field_61:Option[String], field_62:Option[String], field_63:Option[String], field_64:Option[Boolean], field_65:Option[scala.collection.Map[String, String]], field_66:Option[Int], field_67:Option[Int], field_68:Option[String], field_154:Option[TestClass_1], field_167:Option[TestClass_11], field_267:Option[TestClass_13], field_274:Option[Seq[TestClass_22]], field_275:Option[Int], field_290:Option[TestClass_23], field_306:Option[TestClass_24], field_307:Option[Int], field_308:Option[Boolean], field_309:Option[Boolean], field_319:Option[TestClass_28], field_320:Option[java.sql.Timestamp], field_321:Option[java.sql.Date])
CodePudding user response:
I had the exact same issue on one of my previous projects. When the number of partitions is 200, every shuffling operation (join, group by, etc...) will create 200 partitions in your dataset. Since you have just 2 worker threads, those 200 partitions will be processed sequentially on the 2 worker threads (no more than 2 partitions will be processed in parallel).
Every partition incurs some overhead. Say you have 1000 records in total. Split to 200 partitions, you have 5 records per partition. So total processing cost is 1000 records 200 partition overhead. Since the partition is very small (just 5 records), the processing time of the data in the partition becomes less than the overhead incurred by having the partition.
The general rule of thumb is to have ~2 partitions per core.
CodePudding user response:
Adding to https://stackoverflow.com/a/72801534/4278032, the difference you experienced (10 secs using 4 partitions vs ~60 secs with 200 partitions) seems way bigger than anything I've seen before.
Running your code on my machine shows results more in line with what I would expect (for Spark 2.4). Most interesting is that setting shuffle partitions for tests seems to be not required anymore for Spark 3.2 / 3.3.
Using 200 partitions (Spark 3.3.0):
--------- --------------------
| _1| _2|
--------- --------------------
|{key1, 1}|{key1, [{null, nu...|
|{key2, 2}| null|
--------- --------------------
[join] elapsed: 2s
--------- --------------------
| _1| _2|
--------- --------------------
|{key1, 1}|[{null, null, nul...|
|{key2, 2}| []|
--------- --------------------
[map] elapsed: 2s
Using 4 partitions (Spark 3.3.0):
--------- --------------------
| _1| _2|
--------- --------------------
|{key1, 1}|{key1, [{null, nu...|
|{key2, 2}| null|
--------- --------------------
[join] elapsed: 2s
--------- --------------------
| _1| _2|
--------- --------------------
|{key1, 1}|[{null, null, nul...|
|{key2, 2}| []|
--------- --------------------
[map] elapsed: 2s
Using 200 partitions (Spark 2.4.8):
--------- --------------------
| _1| _2|
--------- --------------------
|[key1, 1]|[key1, [[,,,,,,,,...|
|[key2, 2]| null|
--------- --------------------
[join] elapsed: 5s
--------- --------------------
| _1| _2|
--------- --------------------
|[key1, 1]|[[,,,,,,,,,,,,,,,...|
|[key2, 2]| []|
--------- --------------------
[map] elapsed: 17s
Using 4 partitions (Spark 2.4.8):
--------- --------------------
| _1| _2|
--------- --------------------
|[key1, 1]|[key1, [[,,,,,,,,...|
|[key2, 2]| null|
--------- --------------------
[join] elapsed: 2s
--------- --------------------
| _1| _2|
--------- --------------------
|[key1, 1]|[[,,,,,,,,,,,,,,,...|
|[key2, 2]| []|
--------- --------------------
[map] elapsed: 3s
Though, when reducing the log level to DEBUG
in case of Spark 2.4 execution time really explodes just because of the pure overhead of logging for all these empty partitions(and constant synchronisation between the two threads).