I have two dataframes where I have to use a value of one dataframe to filter on the second dataframe using that value.
For example, below are the datasets
import pyspark
from pyspark.sql import Row
cust = spark.createDataFrame([Row(city='hyd',cust_id=100),
Row(city='blr',cust_id=101),
Row(city='chen',cust_id=102),
Row(city='mum',cust_id=103)])
item = spark.createDataFrame([Row(item='fish',geography=['london','a','b','hyd']),
Row(item='chicken',geography=['a','hyd','c']),
Row(item='rice',geography=['a','b','c','blr']),
Row(item='soup',geography=['a','kol','simla']),
Row(item='pav',geography=['a','del']),
Row(item='kachori',geography=['a','guj']),
Row(item='fries',geography=['a','chen']),
Row(item='noodles',geography=['a','mum'])])
cust dataset output:
---- -------
|city|cust_id|
---- -------
| hyd| 100|
| blr| 101|
|chen| 102|
| mum| 103|
---- -------
item dataset output:
------- ------------------
| item| geography|
------- ------------------
| fish|[london, a, b,hyd]|
|chicken| [a, hyd, c]|
| rice| [a, b, c, blr]|
| soup| [a, kol, simla]|
| pav| [a, del]|
|kachori| [a, guj]|
| fries| [a, chen]|
|noodles| [a, mum]|
------- ------------------
I need to use the city column values from cust dataframe to get the items from the item dataset. The final output should be:
---- --------------- -------
|city| items|cust_id|
---- --------------- -------
| hyd|[fish, chicken]| 100|
| blr| [rice]| 101|
|chen| [fries]| 102|
| mum| [noodles]| 103|
---- --------------- -------
CodePudding user response:
Before the join
I would explode
the array column. Then, collect_list
aggregation can move all items to one list.
from pyspark.sql import functions as F
df = cust.join(item.withColumn('city', F.explode('geography')), 'city', 'left')
df = (df.groupBy('city', 'cust_id')
.agg(F.collect_list('item').alias('items'))
.select('city', 'items', 'cust_id')
)
df.show(truncate=False)
# ---- --------------- -------
#|city|items |cust_id|
# ---- --------------- -------
#|blr |[rice] |101 |
#|chen|[fries] |102 |
#|hyd |[fish, chicken]|100 |
#|mum |[noodles] |103 |
# ---- --------------- -------
CodePudding user response:
new = (
#Because cust is a small df, convert it into a list using rdd collect
item.withColumn('items',array_join(array(*[F.lit(x) for x in cust.rdd.map(lambda x:x[0]).collect()]),'|'))
#Check existence using rlike and filter
.filter(expr("rlike(geography[1],items)")).withColumn('city',col('geography')[1])
#Groupby
.groupby('city').agg(collect_list('item').alias('geography')).join(cust, how='left', on='city')).show()
---- --------------- -------
|city| geography|cust_id|
---- --------------- -------
| blr| [rice]| 101|
|chen| [fries]| 102|
| hyd|[fish, chicken]| 100|
| mum| [noodles]| 103|
---- --------------- -------