Home > database >  Detect existence of column element in multiple other columns using join
Detect existence of column element in multiple other columns using join

Time:06-02

I'm using PySpark 2.4.

I have a dataframe like below as input:

 ceci_p| ceci_l|ceci_stok|
------- ------- --------- 
SFIL401| BPI202|   BPI202|
 BPI202| CDC111|   BPI202|
 LBP347|SFIL402|  SFIL402|
 LBP347|SFIL402|   LBP347|
------- ------- --------- 

I want to detect which ceci_stok values exist in both ceci_l and ceci_p columns using a join (maybe a self join).

For example: ceci_stok = BPI202 exists in both ceci_l and ceci_p.

I want to create a new dataframe as a result that contains ceci_stok which exist in both ceci_l and ceci_p.

CodePudding user response:

The following seems to be working in Spark 3.0.2. Please try it.

from pyspark.sql functions as F

df2 = (
    df.select('ceci_stok').alias('_stok')
    .join(df.alias('_p'), F.col('_stok.ceci_stok') == F.col('_p.ceci_p'), 'leftsemi')
    .join(df.alias('_l'), F.col('_stok.ceci_stok') == F.col('_l.ceci_l'), 'leftsemi')
    .distinct()
)

df2.show()
#  --------- 
# |ceci_stok|
#  --------- 
# |   BPI202|
#  --------- 

CodePudding user response:

You're right, that can be done using autojoin. If you have a dataframe

>>> df.show(truncate=False)
 ------- ------- ---------                                                      
|ceci_p |ceci_l |ceci_stok|
 ------- ------- --------- 
|SFIL401|BPI202 |BPI202   |
|BPI202 |CDC111 |BPI202   |
|LBP347 |SFIL402|SFIL402  |
|LBP347 |SFIL402|LBP347   |
 ------- ------- --------- 

...then the following couple of joins (with "leftsemi" to drop left-hand side) should produce what you need:

>>> df.select("ceci_stok") \
      .join(df.select("ceci_p"),df.ceci_stok == df.ceci_p,"leftsemi") \
      .join(df.select("ceci_l"),df.ceci_stok == df.ceci_l,"leftsemi") \
      .show(truncate=False)
 ---------                                                                      
|ceci_stok|
 --------- 
|BPI202   |
|BPI202   |
 --------- 

You can dedup the result if you're just interested in unique values.

CodePudding user response:

#c reate data for testing 
data = [("SFIL401","BPI202","BPI202"),
("BPI202","CDC111","BPI202"),
("LBP347","SFIL402","SFIL402"),
("LBP347","SFIL402","LBP347")]

data_schema = ["ceci_p","ceci_l","ceci_stok"]

df = spark.createDataFrame(data=data, schema = data_schema)
ceci_p = df.cache()\ #don't forget to cache table you reference multiple times.
 .select( df.ceci_p.alias("join_key") )\ #rename for union
 .distinct()
ceci_l = df\
 .select( df.ceci_l.alias("join_key") )\ #rename for union
 .distinct()
vals = ceci_l.join(ceci_p,"join_key").distinct() # get unique values to both columns your interested in
df.join( vals, df.ceci_stok == vals.join_key ).show()
 ------- ------- --------- -------- 
| ceci_p| ceci_l|ceci_stok|join_key|
 ------- ------- --------- -------- 
|SFIL401| BPI202|   BPI202|  BPI202|
| BPI202| CDC111|   BPI202|  BPI202|
 ------- ------- --------- -------- 
  • Related