Home > other >  how to Avoid self-join in spark scala
how to Avoid self-join in spark scala

Time:11-19

I have a DataFrame called product_relationship_current and I'm doing a self-join to retrieve a new DataFrame like bellow:

First I'm giving it an alias so I could consider them like two different dataframes:

val pr1 = product_relationship_current.alias("pr1").where(col("TYPE").isin("contains", "CONTAINS"))
val pr2 = product_relationship_current.alias("pr2")

And then I'm doing a self-join to get a new dataframe:

val stackoutput = pr1.join(pr2, pr1("PRODUCT_VERSION_ID_RELATED_FK") === pr2("PRODUCT_VERSION_ID_FK"), "left")
  .select(pr1("PRODUCT_ID"), pr1("PRODUCT_VERSION"), pr1("RELATED_PRODUCT_ID"), pr1("RELATED_PRODUCT_VERSION"), pr1("TYPE"), pr1("PRODUCT_VERSION_ID_RELATED_FK"))
  .distinct()

But I'm looking for another way to do that without doing a self-join, so I don't have to load the same dataframe twice because it is taking so long to be executed. (my product_relationship_current dataframe is too large).

This is the SQL query I tried to perform using spark scala:

select 
  distinct pr1.product_id as IO, 
  pr1.product_version as IOV, 
  pr1.related_product_id, 
  pr1.related_product_version, 
  pr1.type, 
  pr1.product_version_id_related_fk 
from 
  product_relationship_current as pr1 
  left join product_relationship_current as pr2 on pr1.product_version_id_related_fk = pr2.product_version_id_fk 
where 
  pr1.type = 'contains' 

CodePudding user response:

I'll structure this answer in 2 parts: my answer, and a question.

My answer

If you want to avoid to read a dataframe twice, you can use df.cache to cache it into memory/disk. df.cache is basically df.persist using the default Dataframe was not cached

The second image is one where I did .cache my dataframe.

Dataframe was cached

So you see the difference at the top of the images: because the dataframe was cached the CSV file (or whatever source you are using) won't be read in twice: you see that we have a InMemoryTableScan block in both branches instead of a Scan csv block.

Important: If your data is so large that it does not fit into the combined memory of your executors, the data will spill over to disk. This will make this operation slower.

My question to you: It seems like you're doing a left join with pr1 being the left table, and afterwards you're only selecting columns from the pr1 part of the table. Is this join even needed? Seems like you could just select the wanted columns and then .distinct the dataframe.

Hope this helps!

  • Related