Home > Back-end >  Query a second dataframe based on the values of first dataframe. [spark] [pyspark]
Query a second dataframe based on the values of first dataframe. [spark] [pyspark]

Time:11-19

I have a specific requirement where I need to query a dataframe based on a range condition.

The values of the range come from the rows of another dataframe and so I will have as many queries as the rows in this different dataframe.

Using collect() in my scenario seems to be the bottleneck because it brings every row to the driver.

Sample example:

I need to execute a query on table 2 for every row in table 1

Table 1:

ID1 Num1 Num2
1 10 3
2 40 4

Table 2

ID2 Num3
1 9
2 39
3 22
4 12

For the first row in table 1, I create a range [10-3,10 3] =[7,13] => this becomes the range for the first query.

For the second row in table 2, I create a range [40-4,40 4] =[36,44] => this becomes the range for the second query.

I am currently doing collect() and iterating over the rows to get the values. I use these values as ranges in my queries for Table 2.

Output of Query 1:

ID2 Num3
1 9
4 12

Output of Query 2:

ID2 Num3
2 39

Since the number of rows in table 1 is very large, doing a collect() operation is costly.

And since the values are numeric, I assume a join won't work.

Any help in optimizing this task is appreciated.

CodePudding user response:

Depending on what you want your output to look like, you could solve this with a join. Consider the following code:

case class FirstType(id1: Int, num1: Int, num2: Int)
case class Bounds(id1: Int, lowerBound: Int, upperBound: Int)
case class SecondType(id2: Int, num3: Int)

val df = Seq((1, 10, 3), (2, 40, 4)).toDF("id1", "num1", "num2").as[FirstType]
df.show
 --- ---- ---- 
|id1|num1|num2|
 --- ---- ---- 
|  1|  10|   3|
|  2|  40|   4|
 --- ---- ---- 

val df2 = Seq((1, 9), (2, 39), (3, 22), (4, 12)).toDF("id2", "num3").as[SecondType]
df2.show
 --- ---- 
|id2|num3|
 --- ---- 
|  1|   9|
|  2|  39|
|  3|  22|
|  4|  12|
 --- ---- 

val bounds = df.map(x => Bounds(x.id1, x.num1 - x.num2, x.num1   x.num2))
bounds.show
 --- ---------- ---------- 
|id1|lowerBound|upperBound|
 --- ---------- ---------- 
|  1|         7|        13|
|  2|        36|        44|
 --- ---------- ---------- 

val test = bounds.join(df2, df2("num3") >= bounds("lowerBound") && df2("num3") <= bounds("upperBound"))
test.show
 --- ---------- ---------- --- ---- 
|id1|lowerBound|upperBound|id2|num3|
 --- ---------- ---------- --- ---- 
|  1|         7|        13|  1|   9|
|  2|        36|        44|  2|  39|
|  1|         7|        13|  4|  12|
 --- ---------- ---------- --- ---- 

In here, I do the following:

  • Create 3 case classes to be able to use typed datasets later on
  • Create the 2 dataframes
  • Create an auxilliary dataframe called bounds, which contains the lower/upper bounds
  • Join the second dataframe onto that auxilliary one

As you can see, the test dataframe contains the result. For each unique combination of the id1, lowerBound and upperBound columns you'll get the different dataframes that you wanted if you look at the id2 and num3 columns only.

You could, for example use a groupBy operation to group by these 3 columns and then do whatever you wanted with the output KeyValueGroupedDataset (something like test.groupBy("id1", "lowerBound", "upperBound")). From there on it depends on what you want: if you want to apply an operation to each dataset for each of the bounds you could use the mapValues method of KeyValueGroupedDataset.

Hope this helps!

  • Related