I have a DataFrame called product_relationship_current and I'm doing a self-join to retrieve a new DataFrame like bellow:
First I'm giving it an alias so I could consider them like two different dataframes:
val pr1 = product_relationship_current.alias("pr1").where(col("TYPE").isin("contains", "CONTAINS"))
val pr2 = product_relationship_current.alias("pr2")
And then I'm doing a self-join to get a new dataframe:
val stackoutput = pr1.join(pr2, pr1("PRODUCT_VERSION_ID_RELATED_FK") === pr2("PRODUCT_VERSION_ID_FK"), "left")
.select(pr1("PRODUCT_ID"), pr1("PRODUCT_VERSION"), pr1("RELATED_PRODUCT_ID"), pr1("RELATED_PRODUCT_VERSION"), pr1("TYPE"), pr1("PRODUCT_VERSION_ID_RELATED_FK"))
.distinct()
But I'm looking for another way to do that without doing a self-join, so I don't have to load the same dataframe twice because it is taking so long to be executed. (my product_relationship_current dataframe is too large).
This is the SQL query I tried to perform using spark scala:
select
distinct pr1.product_id as IO,
pr1.product_version as IOV,
pr1.related_product_id,
pr1.related_product_version,
pr1.type,
pr1.product_version_id_related_fk
from
product_relationship_current as pr1
left join product_relationship_current as pr2 on pr1.product_version_id_related_fk = pr2.product_version_id_fk
where
pr1.type = 'contains'
CodePudding user response:
I'll structure this answer in 2 parts: my answer, and a question.
My answer
If you want to avoid to read a dataframe twice, you can use df.cache
to cache it into memory/disk. df.cache
is basically df.persist
using the default
The second image is one where I did .cache
my dataframe.
So you see the difference at the top of the images: because the dataframe was cached the CSV file (or whatever source you are using) won't be read in twice: you see that we have a InMemoryTableScan
block in both branches instead of a Scan csv
block.
Important: If your data is so large that it does not fit into the combined memory of your executors, the data will spill over to disk. This will make this operation slower.
My question to you: It seems like you're doing a left join with pr1
being the left table, and afterwards you're only selecting columns from the pr1 part of the table. Is this join even needed? Seems like you could just select the wanted columns and then .distinct
the dataframe.
Hope this helps!