Home > Software engineering >  Join on element inside array
Join on element inside array

Time:04-19

I have two dataframes where I have to use a value of one dataframe to filter on the second dataframe using that value.

For example, below are the datasets

import pyspark
from pyspark.sql import Row

cust = spark.createDataFrame([Row(city='hyd',cust_id=100),
                              Row(city='blr',cust_id=101),
                              Row(city='chen',cust_id=102),
                              Row(city='mum',cust_id=103)])

item = spark.createDataFrame([Row(item='fish',geography=['london','a','b','hyd']),
                              Row(item='chicken',geography=['a','hyd','c']),
                              Row(item='rice',geography=['a','b','c','blr']),
                              Row(item='soup',geography=['a','kol','simla']),
                              Row(item='pav',geography=['a','del']),
                              Row(item='kachori',geography=['a','guj']),
                              Row(item='fries',geography=['a','chen']),
                              Row(item='noodles',geography=['a','mum'])])

cust dataset output:

 ---- ------- 
|city|cust_id|
 ---- ------- 
| hyd|    100|
| blr|    101|
|chen|    102|
| mum|    103|
 ---- ------- 

item dataset output:

 ------- ------------------ 
|   item|         geography|
 ------- ------------------ 
|   fish|[london, a, b,hyd]|
|chicken|       [a, hyd, c]|
|   rice|    [a, b, c, blr]|
|   soup|   [a, kol, simla]|
|    pav|          [a, del]|
|kachori|          [a, guj]|
|  fries|         [a, chen]|
|noodles|          [a, mum]|
 ------- ------------------ 

I need to use the city column values from cust dataframe to get the items from the item dataset. The final output should be:

 ---- --------------- ------- 
|city|          items|cust_id|
 ---- --------------- ------- 
| hyd|[fish, chicken]|    100|
| blr|         [rice]|    101|
|chen|        [fries]|    102|
| mum|      [noodles]|    103|
 ---- --------------- ------- 

CodePudding user response:

Before the join I would explode the array column. Then, collect_list aggregation can move all items to one list.

from pyspark.sql import functions as F

df = cust.join(item.withColumn('city', F.explode('geography')), 'city', 'left')
df = (df.groupBy('city', 'cust_id')
        .agg(F.collect_list('item').alias('items'))
        .select('city', 'items', 'cust_id')
)
df.show(truncate=False)
# ---- --------------- ------- 
#|city|items          |cust_id|
# ---- --------------- ------- 
#|blr |[rice]         |101    |
#|chen|[fries]        |102    |
#|hyd |[fish, chicken]|100    |
#|mum |[noodles]      |103    |
# ---- --------------- ------- 

CodePudding user response:

   new = (
  #Because cust is a small df, convert it into a list using rdd collect
  item.withColumn('items',array_join(array(*[F.lit(x) for x in cust.rdd.map(lambda x:x[0]).collect()]),'|'))
  
  #Check existence using rlike and filter
  .filter(expr("rlike(geography[1],items)")).withColumn('city',col('geography')[1])
  
  #Groupby
  .groupby('city').agg(collect_list('item').alias('geography')).join(cust, how='left', on='city')).show()

 ---- --------------- ------- 
|city|      geography|cust_id|
 ---- --------------- ------- 
| blr|         [rice]|    101|
|chen|        [fries]|    102|
| hyd|[fish, chicken]|    100|
| mum|      [noodles]|    103|
 ---- --------------- ------- 
  • Related