I am looking for help in a scenario where I have a scala dataframe PARENT. I need to
loop through each record in PARENT dataframe
Query the records from a database based on a filter using ID value of parent (the output of this step is dataframe)
append few attributes from parent to queried dataframe
Ex:
ParentDF id parentname 1 X 2 Y Queried Dataframe for id 1 id queryid name 1 23 lobo 1 45 sobo 1 56 aobo Queried Dataframe for id 2 id queryid name 2 53 lama 2 67 dama 2 56 pama
Final output required :
id parentname queryid name
1 X 23 lobo
1 X 45 sobo
1 X 56 aobo
2 Y 53 lama
2 Y 67 dama
2 Y 56 pama
Update1:
I tried using foreachpartition and use foreach internally to loop through each record and got below error.
error: Unable to find encoder for type org.apache.spark.sql.DataFrame. An implicit Encoder[org.apache.spark.sql.DataFrame] is needed to store org.apache.spark.sql.DataFrame instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
falttenedData.map(row=>{
I need to do this with scalability plz. Any help is really appreciated.
CodePudding user response:
You can use the .join
method on a dataframe for this one.
Some example code would be something like this:
val df = Seq((1, "X"), (2, "Y")).toDF("id", "parentname")
df.show
--- ----------
| id|parentname|
--- ----------
| 1| X|
| 2| Y|
--- ----------
val df2 = Seq((1, 23, "lobo"), (1, 45, "sobo"), (1, 56, "aobo"), (2, 53, "lama"), (2, 67, "dama"), (2, 56, "pama")).toDF("id", "queryid", "name")
df2.show
--- ------- ----
| id|queryid|name|
--- ------- ----
| 1| 23|lobo|
| 1| 45|sobo|
| 1| 56|aobo|
| 2| 53|lama|
| 2| 67|dama|
| 2| 56|pama|
--- ------- ----
val output=df.join(df2, Seq("id"))
output.show
--- ---------- ------- ----
| id|parentname|queryid|name|
--- ---------- ------- ----
| 1| X| 23|lobo|
| 1| X| 45|sobo|
| 1| X| 56|aobo|
| 2| Y| 53|lama|
| 2| Y| 67|dama|
| 2| Y| 56|pama|
--- ---------- ------- ----
Hope this helps! :)
CodePudding user response:
The solution is pretty straightforward, you just need to join
your parentDF and your other one.
parentDF.join(
otherDF,
Seq("id"),
"left"
)
As you're caring about scalability, In case your "otherDF" is quite small (it has less than 10K rows for example with 2-3 cols), you should consider using broadcast join : parentDF.join(broadcast(otherDF), Seq("id"), "left)
.