Home > Software design >  Replace missing values from a reference dataframe in a pyspark join
Replace missing values from a reference dataframe in a pyspark join

Time:09-17

I have 2 dataframes below

Dataframe1 (df_address) - Has 2 columns => user_id and address_id (all addresses for a user)  
Dataframe2 (df_orders) - Has 3 columns => user_id, address_id, order_id (user_id is foreign key)

One thing to note is that the df_address table contains some addresses that are not in df_orders table

I need to find order_share associated with each (user_id,address_id) pair which is computed by

count unique pairs (user_id,address_id) / (address_count based on user_id) * 100

For that, I did the following


df1 = df_address.groupBy('customer_id').count().select('customer_id',(f.col('count')).alias('address_counts')) # get total address counts for each user

df2 = df_orders.groupBy('customer_id','address_id').count().select('customer_id','address_id',f.col('count').alias('order_count')) # get count of each order in `df_orders`

df = df1.join(df2, "customer_id", how='inner')

def get_order_share(order_count,address_counts):
  return (order_count/address_counts)*100

udf_func = udf(get_order_share,FloatType()) 
df = df.withColumn("order_share",udf_func(df.order_count,df.address_counts)) # get the percentage from total

The o/p of df is

|customer_id|address_counts|address_id|order_count|order_share|

|          1|             6|       102|          2|  33.333332|
|          1|             6|       100|          4|  66.666664|
|          1|             6|       103|          1|  16.666666|
|          1|             6|       101|          1|  16.666666|
|          3|             3|       300|          2|  66.666664|

However, for the id's that are not in the df_orders dataframe, I would like to include them also in the result with order_count and order_share of 0. I am not sure how to go about this and at what step I need to do this

I am relatively new in PySpark, and would like to know the right way to go about solving this problem.

Any help will be appreciated. Thanks!

Edit 1 - Added sample data

columns1 = ['customer_id','address_id','order_id']

data1 = [
    (1,100,733),(1,100,8389),(1,100,894),(1,100,653),(1,101,649),(1,102,6493),(1,102,6449),(1,103,749),
    (2,200,648),(2,200,545),(2,201,8384),
    (3,300,8392),(3,300,7294),(3,301,828),(3,301,9204),
]


columns2 = ['customer_id','address_id']

data2 = [
    (1,100),(1,101),(1,102),(1,103),(1,104),(1,105),
    (2,200),(2,201),(2,202),
    (3,300),(3,301),(3,302),
]

df_orders = spark.createDataFrame(data1, columns1)
df_address = spark.createDataFrame(data2, columns2)

# Notice addresses 104,105,202 and 302 are missing from `df_orders`

CodePudding user response:

You may continue by left join the new df on your existing df_address and filling the empty values with 0

df = df_address.join(df,['customer_id','address_id'],'left').na.fill(0)
df.show()
 ----------- ---------- -------------- ----------- ----------- 
|customer_id|address_id|address_counts|order_count|order_share|
 ----------- ---------- -------------- ----------- ----------- 
|          2|       200|             3|          2|  66.666664|
|          3|       300|             3|          2|  66.666664|
|          3|       301|             3|          2|  66.666664|
|          1|       102|             6|          2|  33.333332|
|          3|       302|             0|          0|        0.0|
|          1|       104|             0|          0|        0.0|
|          1|       105|             0|          0|        0.0|
|          2|       201|             3|          1|  33.333332|
|          1|       100|             6|          4|  66.666664|
|          1|       103|             6|          1|  16.666666|
|          1|       101|             6|          1|  16.666666|
|          2|       202|             0|          0|        0.0|
 ----------- ---------- -------------- ----------- ----------- 

Let me know if this works for you.

  • Related