First of all, I have to say that I've already tried everything I know or found on google (Including this Spark: How to use crossJoin which is exactly my problem).
I have to calculate the Cartesian product between two DataFrame - countries
and units
such that -
A.cache().count()
val units = A.groupBy("country")
.agg(sum("grade").as("grade"),
sum("point").as("point"))
.withColumn("AVR", $"grade" / $"point" * 1000)
.drop("point", "grade")
val countries = D.select("country").distinct()
val C = countries.crossJoin(units)
countries
contains a countries name and its size bounded by 150. units
is DataFrame with 3 rows - an aggregated result of other DataFrame. I checked 100 times the result and those are the sizes indeed - and it takes 5 hours to complete.
I know I missed something. I've tried caching, repartitioning, etc. I would love to get some other ideas.
CodePudding user response:
What is the action you are doing afterwards with C
?
Also, if these datasets are so small, consider collecting them to the driver, and doing these manupulation there, you can always spark.createDataFrame
later again.
CodePudding user response:
I have two suggestions for you:
Look at the explain plan and the spark properties, for the amount of data you have mentioned 5 hours is a really long time. My expectation is you have way too many shuffles, you can look at different properties like : spark.sql.shuffle.partitions
Instead of doing a cross join, you can maybe do a collect and explore broadcasts https://sparkbyexamples.com/spark/spark-broadcast-variables/ but do this only on small amounts of data as this data is brought back to the driver.