Home > Enterprise >  Condition based join in pyspark
Condition based join in pyspark

Time:06-24

Given two dataframes:

A
 --- --- --- 
|id1|id2|id3|
 --- --- --- 
|11 |22 |aaa|
|12 |23 |bbb|
|13 |34 |L12|
|14 |32 |L22|
 --- --- --- 
B
 --- --------
|id1|id2|type |
 --- --------
| 22|11 |red |
| 23|12 |red |
| 34|L12|blue|
| 32|L22|blue|
 --- --------

I'd like to join them as follows:

if B.type == 'red': A.id1 == B.id2
else if B.type == 'blue': (A.id2 == B.id1) & (A.id3 == B.id2)

Thus, in the end I'd have:

 --- --- --- --- --- ---- 
|id1|id2|id3|id1|id2|type|
 --- --- --- --- --- ---- 
| 11| 22|aaa| 22| 11| red|
| 12| 23|bbb| 23| 12| red|
| 13| 34|L12| 34|L12|blue|
| 14| 32|L22| 32|L22|blue|
 --- --- --- --- --- ---- 

But the above result is obtained by extracting the condition e.g. join_condition = (when(B.type == 'red', A.id == B.id2) ...

I'd like to approach the problem like:

reds = B.filter(type == 'red')
blues = B.filter(type == 'blue)

and then join them one by one:

a_reds = A.join(reds, A.id1 == B.id2, 'left')
a_blues = A.join(blues, (A.id2 == B.id1) & (A.id3 == B.id2))

Now in order to get to a unified table, I'd like to union them, but to not include the null values which appear after calling union.

e.g.:

 --- --- --- ---- ---- ---- 
|id1|id2|id3| id1| id2|type|
 --- --- --- ---- ---- ---- 
| 14| 32|L22|null|null|null|
| 11| 22|aaa|  22|  11| red|
| 12| 23|bbb|  23|  12| red|
| 13| 34|L12|null|null|null|
| 12| 23|bbb|null|null|null|
| 14| 32|L22|  32| L22|blue|
| 13| 34|L12|  34| L12|blue|
| 11| 22|aaa|null|null|null|
 --- --- --- ---- ---- ---- 

Can it be done? If so, how?

Thank you.

CodePudding user response:

You can avoid the null records by not doing the left join.

Or you can filter out records where "type=null" after performing the union.

CodePudding user response:

You can use the conditional join instead of 2 joins union.

# Assuming A and B is the dataframe name. 
from pyspark.sql import functions as F
join_cond = (F.when(F.col('type') == 'red', A.id1 == B.id2)
              .when(F.col('type') == 'blue', (A.id2 == B.id1) & (A.id3 == B.id2)))

df = A.join(B, join_cond)

Result

 --- --- --- --- --- ---- 
|id1|id2|id3|id1|id2|type|
 --- --- --- --- --- ---- 
| 11| 22|aaa| 22| 11| red|
| 12| 23|bbb| 23| 12| red|
| 13| 34|L12| 34|L12|blue|
| 14| 32|L22| 32|L22|blue|
 --- --- --- --- --- ---- 
  • Related