I have a specific requirement where I need to query a dataframe based on a range condition.
The values of the range come from the rows of another dataframe and so I will have as many queries as the rows in this different dataframe.
Using collect() in my scenario seems to be the bottleneck because it brings every row to the driver.
Sample example:
I need to execute a query on table 2 for every row in table 1
Table 1:
ID1 | Num1 | Num2 |
---|---|---|
1 | 10 | 3 |
2 | 40 | 4 |
Table 2
ID2 | Num3 |
---|---|
1 | 9 |
2 | 39 |
3 | 22 |
4 | 12 |
For the first row in table 1, I create a range [10-3,10 3] =[7,13] => this becomes the range for the first query.
For the second row in table 2, I create a range [40-4,40 4] =[36,44] => this becomes the range for the second query.
I am currently doing collect() and iterating over the rows to get the values. I use these values as ranges in my queries for Table 2.
Output of Query 1:
ID2 | Num3 |
---|---|
1 | 9 |
4 | 12 |
Output of Query 2:
ID2 | Num3 |
---|---|
2 | 39 |
Since the number of rows in table 1 is very large, doing a collect() operation is costly.
And since the values are numeric, I assume a join won't work.
Any help in optimizing this task is appreciated.
CodePudding user response:
Depending on what you want your output to look like, you could solve this with a join. Consider the following code:
case class FirstType(id1: Int, num1: Int, num2: Int)
case class Bounds(id1: Int, lowerBound: Int, upperBound: Int)
case class SecondType(id2: Int, num3: Int)
val df = Seq((1, 10, 3), (2, 40, 4)).toDF("id1", "num1", "num2").as[FirstType]
df.show
--- ---- ----
|id1|num1|num2|
--- ---- ----
| 1| 10| 3|
| 2| 40| 4|
--- ---- ----
val df2 = Seq((1, 9), (2, 39), (3, 22), (4, 12)).toDF("id2", "num3").as[SecondType]
df2.show
--- ----
|id2|num3|
--- ----
| 1| 9|
| 2| 39|
| 3| 22|
| 4| 12|
--- ----
val bounds = df.map(x => Bounds(x.id1, x.num1 - x.num2, x.num1 x.num2))
bounds.show
--- ---------- ----------
|id1|lowerBound|upperBound|
--- ---------- ----------
| 1| 7| 13|
| 2| 36| 44|
--- ---------- ----------
val test = bounds.join(df2, df2("num3") >= bounds("lowerBound") && df2("num3") <= bounds("upperBound"))
test.show
--- ---------- ---------- --- ----
|id1|lowerBound|upperBound|id2|num3|
--- ---------- ---------- --- ----
| 1| 7| 13| 1| 9|
| 2| 36| 44| 2| 39|
| 1| 7| 13| 4| 12|
--- ---------- ---------- --- ----
In here, I do the following:
- Create 3 case classes to be able to use typed datasets later on
- Create the 2 dataframes
- Create an auxilliary dataframe called
bounds
, which contains the lower/upper bounds - Join the second dataframe onto that auxilliary one
As you can see, the test
dataframe contains the result. For each unique combination of the id1
, lowerBound
and upperBound
columns you'll get the different dataframes that you wanted if you look at the id2
and num3
columns only.
You could, for example use a groupBy
operation to group by these 3 columns and then do whatever you wanted with the output KeyValueGroupedDataset
(something like test.groupBy("id1", "lowerBound", "upperBound")
). From there on it depends on what you want: if you want to apply an operation to each dataset for each of the bounds you could use the mapValues
method of KeyValueGroupedDataset
.
Hope this helps!