Home > OS >  How to join Pyspark dataframes based on groups
How to join Pyspark dataframes based on groups

Time:09-29

I have 2 Pyspark dataframes

Dataframe 1 - df where the columns are customer_id, address_id, order_id, date the order was placed, order_share

 ---- ---- -------- ---------- ----------- 
|c_id|a_id|order_id|order_date|order_share|
 ---- ---- -------- ---------- ----------- 
|  c1|  a1|       1|2021-01-23|        0.5|
|  c1|  a2|       1|2021-01-23|        0.2|
|  c1|  a3|       1|2021-01-23|        0.3|
|  c2|  a5|       2|2021-03-20|        0.4|
|  c2|  a6|       2|2021-03-20|        0.6|
|  c1|  a1|       3|2021-02-20|        0.3|
|  c1|  a2|       3|2021-02-20|        0.3|
|  c1|  a3|       3|2021-02-20|        0.4|
 ---- ---- -------- ---------- ----------- 

Dataframe 2 - df_address where the columns are customer_id, address_id, the date of address creation

 ---- ---- ------------ 
|c_id|a_id|created_date|
 ---- ---- ------------ 
|  c1|  a1|  2020-12-31|
|  c1|  a2|  2020-04-23|
|  c1|  a3|  2020-03-23|
|  c1|  a4|  2020-01-16|
|  c2|  a5|  2020-12-28|
|  c2|  a6|  2020-05-16|
|  c2|  a7|  2020-03-04|
 ---- ---- ------------ 

Now, I wish to join both these tables such that for every order_id, I get the address from df_address and the corresponding entry should be 0.0 in the order_share column

My output should look like

 ---- ---- ------------ -------- ---------- ----------- 
|c_id|a_id|created_date|order_id|order_date|order_share|
 ---- ---- ------------ -------- ---------- ----------- 
|  c1|  a1|  2020-12-31|       1|2021-01-23|        0.5|
|  c1|  a2|  2020-04-23|       1|2021-01-23|        0.2|
|  c1|  a3|  2020-03-23|       1|2021-01-23|        0.3|
|  c1|  a4|  2020-01-16|       1|2021-01-23|        0.0|
|  c2|  a5|  2020-12-28|       2|2021-03-20|        0.4|
|  c2|  a6|  2020-05-16|       2|2021-03-20|        0.6|
|  c2|  a7|  2020-03-04|       2|2021-03-20|        0.0|
|  c1|  a1|  2020-12-31|       3|2021-02-20|        0.3|
|  c1|  a2|  2020-04-23|       3|2021-02-20|        0.3|
|  c1|  a3|  2020-03-23|       3|2021-02-20|        0.4|
|  c1|  a4|  2020-01-16|       3|2021-02-20|        0.0|
 ---- ---- ------------ -------- ---------- ----------- 

This doesn't look like a normal left/right join and I should do this for every order_id.

I tried joining using ['c_id','a_id'] but the output is no where close to expected. Considering df_address as left and df as right, Using left join gives me null values for order_id and right join doesn't give me all the addresses from df_address

It looks like I have to apply some sort of groupby for every order_id and then apply the join for each group, but I don't know how to implement this or even sure if that's the right way to go about it

Any help would be appreciated. Thanks!

CodePudding user response:

I tried a full outer join with the DataFrames to get the missing c_id and a_id combination & further utilising when with isNull for Null column values coming from df and replacing when them values from df_address below are the results -

Data Preparation

input_str1 = """
c1|  a1|       1|2021-01-23|        0.5|
c1|  a2|       1|2021-01-23|        0.2|
c1|  a3|       1|2021-01-23|        0.3|
c2|  a5|       2|2021-03-20|        0.4|
c2|  a6|       2|2021-03-20|        0.6|
c1|  a1|       3|2021-02-20|        0.3|
c1|  a2|       3|2021-02-20|        0.3|
c1|  a3|       3|2021-02-20|        0.4
""".split("|")

input_values1 = list(map(lambda x: x.strip() if x.strip() != '' else None, input_str1))

cols1 = list(map(lambda x: x.strip() if x.strip() != '' else None, "c_id|a_id|order_id|order_date|order_share".split("|")))
            
n = len(input_values1)
n_col1 = 5

input_list1 = [tuple(input_values1[i:i n_col1]) for i in range(0,n,n_col1)]

sparkDF1 = sql.createDataFrame(input_list1, cols1)

sparkDF1.show()

 ---- ---- -------- ---------- ----------- 
|c_id|a_id|order_id|order_date|order_share|
 ---- ---- -------- ---------- ----------- 
|  c1|  a1|       1|2021-01-23|        0.5|
|  c1|  a2|       1|2021-01-23|        0.2|
|  c1|  a3|       1|2021-01-23|        0.3|
|  c2|  a5|       2|2021-03-20|        0.4|
|  c2|  a6|       2|2021-03-20|        0.6|
|  c1|  a1|       3|2021-02-20|        0.3|
|  c1|  a2|       3|2021-02-20|        0.3|
|  c1|  a3|       3|2021-02-20|        0.4|
 ---- ---- -------- ---------- ----------- 

input_str2 = """
c1|  a1|  2020-12-31|
c1|  a2|  2020-04-23|
c1|  a3|  2020-03-23|
c1|  a4|  2020-01-16|
c2|  a5|  2020-12-28|
c2|  a6|  2020-05-16|
c2|  a7|  2020-03-04
""".split("|")

input_values2 = list(map(lambda x: x.strip() if x.strip() != '' else None, input_str2))

cols2 = list(map(lambda x: x.strip() if x.strip() != '' else None, "c_id|a_id|created_date".split("|")))
            
n = len(input_values2)
n_col2 = 3

input_list2 = [tuple(input_values2[i:i n_col2]) for i in range(0,n,n_col2)]

sparkDF2 = sql.createDataFrame(input_list2, cols2)

sparkDF2.show()

 ---- ---- ------------ 
|c_id|a_id|created_date|
 ---- ---- ------------ 
|  c1|  a1|  2020-12-31|
|  c1|  a2|  2020-04-23|
|  c1|  a3|  2020-03-23|
|  c1|  a4|  2020-01-16|
|  c2|  a5|  2020-12-28|
|  c2|  a6|  2020-05-16|
|  c2|  a7|  2020-03-04|
 ---- ---- ------------ 

Full Join

Renaming Column values coming from SparkDF2 , which will be further used to populate null values to avoid ambiguous column names

finalDF = sparkDF1.join(sparkDF2
                       , (sparkDF1['c_id'] == sparkDF2['c_id'])
                        & (sparkDF1['a_id'] == sparkDF2['a_id'])
                        ,'full'
                ).select(sparkDF1['*']
                         ,sparkDF2['c_id'].alias('c_id_address')
                         ,sparkDF2['a_id'].alias('a_id_address')
                         ,sparkDF2['created_date']
                        )
finalDF.show()

 ---- ---- -------- ---------- ----------- ------------ ------------ ------------ 
|c_id|a_id|order_id|order_date|order_share|c_id_address|a_id_address|created_date|
 ---- ---- -------- ---------- ----------- ------------ ------------ ------------ 
|  c1|  a3|       1|2021-01-23|        0.3|          c1|          a3|  2020-03-23|
|  c1|  a3|       3|2021-02-20|        0.4|          c1|          a3|  2020-03-23|
|  c2|  a5|       2|2021-03-20|        0.4|          c2|          a5|  2020-12-28|
|null|null|    null|      null|       null|          c2|          a7|  2020-03-04|
|  c1|  a2|       1|2021-01-23|        0.2|          c1|          a2|  2020-04-23|
|  c1|  a2|       3|2021-02-20|        0.3|          c1|          a2|  2020-04-23|
|  c1|  a1|       1|2021-01-23|        0.5|          c1|          a1|  2020-12-31|
|  c1|  a1|       3|2021-02-20|        0.3|          c1|          a1|  2020-12-31|
|null|null|    null|      null|       null|          c1|          a4|  2020-01-16|
|  c2|  a6|       2|2021-03-20|        0.6|          c2|          a6|  2020-05-16|
 ---- ---- -------- ---------- ----------- ------------ ------------ ------------ 

When isNull

finalDF = finalDF.withColumn('c_id',F.when(F.col('c_id').isNull()
                                           ,F.col('c_id_address')).otherwise(F.col('c_id'))
                            )\
                    .withColumn('a_id',F.when(F.col('a_id').isNull()
                                              ,F.col('a_id_address')).otherwise(F.col('a_id'))
                            )\
                    .withColumn('order_share',F.when(F.col('order_share').isNull()
                                                     ,0.0).otherwise(F.col('order_share'))
                            )


finalDF.show()

 ---- ---- -------- ---------- ----------- ------------ ------------ ------------ 
|c_id|a_id|order_id|order_date|order_share|c_id_address|a_id_address|created_date|
 ---- ---- -------- ---------- ----------- ------------ ------------ ------------ 
|  c1|  a3|       1|2021-01-23|        0.3|          c1|          a3|  2020-03-23|
|  c1|  a3|       3|2021-02-20|        0.4|          c1|          a3|  2020-03-23|
|  c2|  a5|       2|2021-03-20|        0.4|          c2|          a5|  2020-12-28|
|  c2|  a7|    null|      null|        0.0|          c2|          a7|  2020-03-04|
|  c1|  a2|       1|2021-01-23|        0.2|          c1|          a2|  2020-04-23|
|  c1|  a2|       3|2021-02-20|        0.3|          c1|          a2|  2020-04-23|
|  c1|  a1|       1|2021-01-23|        0.5|          c1|          a1|  2020-12-31|
|  c1|  a1|       3|2021-02-20|        0.3|          c1|          a1|  2020-12-31|
|  c1|  a4|    null|      null|        0.0|          c1|          a4|  2020-01-16|
|  c2|  a6|       2|2021-03-20|        0.6|          c2|          a6|  2020-05-16|
 ---- ---- -------- ---------- ----------- ------------ ------------ ------------ 

Note - order_id and order_date are null as there is not value present in for the c_id and a_id combination in sparkDF2

This example is provide a approach , towards getting your required solution , you can further improvise if required to populate the order missing values

CodePudding user response:

You can use a intermediate orders dataframe, created from df dataframe and that contains only information about orders, which are columns customer_id, order_id and order_date. Then you first inner join df_address dataframe with this orders dataframe, to link each couple (customer_id, address_id) to orders-specific information, and then left join the resulting dataframe with df dataframe to get order_share per address, then replace null value in order_share column with 0.0.

Here is the complete code:

from pyspark.sql import functions as F

# Orders dataframe that contains only orders-specific information
orders = df.select('customer_id', 'order_id', 'order_date').distinct()

df_address.join(orders, ['customer_id']) \ # link addresses with orders
  .join(df.drop('order_date'), ['customer_id', 'address_id', 'order_id'], 'left_outer') \ # link orders/addresses with order shares
  .withColumn('order_share', F.when(F.col('order_share').isNotNull(), F.col('order_share')).otherwise(F.lit(0.0))) \ # replace null in order_share column with 0.0
  .orderBy('customer_id', 'order_id', 'address_id') \ # optional, to reorder dataframe
  • Related