val df1= spark.read.format("delta").table("...100K_rows...")
val xform = udf ( (message: String) => {
// abstract transformation, ideally this comes from a .jar library
// such as: (abstract) https://github.com/cosmycx/scala_transformer
val t0 = System.currentTimeMillis
Thread.sleep(5)
System.currentTimeMillis - t0
}) // .xformText
spark.udf.register("xform", xform)
val df2= df1.withColumn("xformResult", xform($"SomeText"))
df2.write.format("delta")
.mode(SaveMode.Overwrite)
.saveAsTable("...")
How can this be made to run faster?
What I tried:
- increase the databricks spark cluster node size: DS3_v2 14GB 4 cores vs. DS5_v2 56GB 16 cores
- increase the databricks spark cluster workers numbers, driver plus: 3, 5, and 10 (same speed !?)
- changing: spark.conf.set("spark.sql.shuffle.partitions", "auto") or different values
Results are always in this range: 1 min for 10K rows, and 8 min for 100K rows no matter the changes.
Ideal results would be less than 1 min for 100K . would this even be achievable in databricks spark? This runs in Azure if it matters.
What I'm I missing, other things to consider, try? Thanks.
CodePudding user response:
First off, this is probably not a use case for spark. The dataset is small that the spark optimizer doesn't even know what to do with it.
What happens is the file is small and read by a single task, spark identifies the transformation as simple and applies it while reading the data - so you process things sequentially and waiting the total 8 minutes of Thread.sleep()
The best think is probably not to use spark for this - but if you do try to make spark do a shuffle (sort or whatever) and make sure your code runs after the shuffle so then you'd have x (200 by default) partitions and can run with up to x different task (but again - I wouldn't use Spark for this)
CodePudding user response:
The transform happens sequentially and not concurrent because the df1 DataFrame only has one partition. Repartitioning the initial DataFrame and then run the transformation the new partitioned DataFrame improves significantly the speed, performance, by a factor of at least 10X.
println(df1.rdd.getNumPartitions) // 1
val df2 = df1.repartition(20) // run transform on df2 (100K rows in 35 sec.)