I have 2 Pyspark dataframes
Dataframe 1 - df
where the columns are customer_id, address_id, order_id, date the order was placed, order_share
---- ---- -------- ---------- -----------
|c_id|a_id|order_id|order_date|order_share|
---- ---- -------- ---------- -----------
| c1| a1| 1|2021-01-23| 0.5|
| c1| a2| 1|2021-01-23| 0.2|
| c1| a3| 1|2021-01-23| 0.3|
| c2| a5| 2|2021-03-20| 0.4|
| c2| a6| 2|2021-03-20| 0.6|
| c1| a1| 3|2021-02-20| 0.3|
| c1| a2| 3|2021-02-20| 0.3|
| c1| a3| 3|2021-02-20| 0.4|
---- ---- -------- ---------- -----------
Dataframe 2 - df_address
where the columns are customer_id, address_id, the date of address creation
---- ---- ------------
|c_id|a_id|created_date|
---- ---- ------------
| c1| a1| 2020-12-31|
| c1| a2| 2020-04-23|
| c1| a3| 2020-03-23|
| c1| a4| 2020-01-16|
| c2| a5| 2020-12-28|
| c2| a6| 2020-05-16|
| c2| a7| 2020-03-04|
---- ---- ------------
Now, I wish to join both these tables such that for every order_id, I get the address from df_address
and the corresponding entry should be 0.0
in the order_share
column
My output should look like
---- ---- ------------ -------- ---------- -----------
|c_id|a_id|created_date|order_id|order_date|order_share|
---- ---- ------------ -------- ---------- -----------
| c1| a1| 2020-12-31| 1|2021-01-23| 0.5|
| c1| a2| 2020-04-23| 1|2021-01-23| 0.2|
| c1| a3| 2020-03-23| 1|2021-01-23| 0.3|
| c1| a4| 2020-01-16| 1|2021-01-23| 0.0|
| c2| a5| 2020-12-28| 2|2021-03-20| 0.4|
| c2| a6| 2020-05-16| 2|2021-03-20| 0.6|
| c2| a7| 2020-03-04| 2|2021-03-20| 0.0|
| c1| a1| 2020-12-31| 3|2021-02-20| 0.3|
| c1| a2| 2020-04-23| 3|2021-02-20| 0.3|
| c1| a3| 2020-03-23| 3|2021-02-20| 0.4|
| c1| a4| 2020-01-16| 3|2021-02-20| 0.0|
---- ---- ------------ -------- ---------- -----------
This doesn't look like a normal left/right join and I should do this for every order_id.
I tried joining using ['c_id','a_id']
but the output is no where close to expected. Considering df_address
as left and df
as right, Using left join
gives me null values for order_id
and right join
doesn't give me all the addresses from df_address
It looks like I have to apply some sort of groupby for every order_id and then apply the join for each group, but I don't know how to implement this or even sure if that's the right way to go about it
Any help would be appreciated. Thanks!
CodePudding user response:
I tried a full outer
join with the DataFrames
to get the missing c_id
and a_id
combination & further utilising when with isNull for Null
column values coming from df
and replacing when them values from df_address
below are the results -
Data Preparation
input_str1 = """
c1| a1| 1|2021-01-23| 0.5|
c1| a2| 1|2021-01-23| 0.2|
c1| a3| 1|2021-01-23| 0.3|
c2| a5| 2|2021-03-20| 0.4|
c2| a6| 2|2021-03-20| 0.6|
c1| a1| 3|2021-02-20| 0.3|
c1| a2| 3|2021-02-20| 0.3|
c1| a3| 3|2021-02-20| 0.4
""".split("|")
input_values1 = list(map(lambda x: x.strip() if x.strip() != '' else None, input_str1))
cols1 = list(map(lambda x: x.strip() if x.strip() != '' else None, "c_id|a_id|order_id|order_date|order_share".split("|")))
n = len(input_values1)
n_col1 = 5
input_list1 = [tuple(input_values1[i:i n_col1]) for i in range(0,n,n_col1)]
sparkDF1 = sql.createDataFrame(input_list1, cols1)
sparkDF1.show()
---- ---- -------- ---------- -----------
|c_id|a_id|order_id|order_date|order_share|
---- ---- -------- ---------- -----------
| c1| a1| 1|2021-01-23| 0.5|
| c1| a2| 1|2021-01-23| 0.2|
| c1| a3| 1|2021-01-23| 0.3|
| c2| a5| 2|2021-03-20| 0.4|
| c2| a6| 2|2021-03-20| 0.6|
| c1| a1| 3|2021-02-20| 0.3|
| c1| a2| 3|2021-02-20| 0.3|
| c1| a3| 3|2021-02-20| 0.4|
---- ---- -------- ---------- -----------
input_str2 = """
c1| a1| 2020-12-31|
c1| a2| 2020-04-23|
c1| a3| 2020-03-23|
c1| a4| 2020-01-16|
c2| a5| 2020-12-28|
c2| a6| 2020-05-16|
c2| a7| 2020-03-04
""".split("|")
input_values2 = list(map(lambda x: x.strip() if x.strip() != '' else None, input_str2))
cols2 = list(map(lambda x: x.strip() if x.strip() != '' else None, "c_id|a_id|created_date".split("|")))
n = len(input_values2)
n_col2 = 3
input_list2 = [tuple(input_values2[i:i n_col2]) for i in range(0,n,n_col2)]
sparkDF2 = sql.createDataFrame(input_list2, cols2)
sparkDF2.show()
---- ---- ------------
|c_id|a_id|created_date|
---- ---- ------------
| c1| a1| 2020-12-31|
| c1| a2| 2020-04-23|
| c1| a3| 2020-03-23|
| c1| a4| 2020-01-16|
| c2| a5| 2020-12-28|
| c2| a6| 2020-05-16|
| c2| a7| 2020-03-04|
---- ---- ------------
Full Join
Renaming Column values coming from SparkDF2 , which will be further used to populate null values to avoid ambiguous column names
finalDF = sparkDF1.join(sparkDF2
, (sparkDF1['c_id'] == sparkDF2['c_id'])
& (sparkDF1['a_id'] == sparkDF2['a_id'])
,'full'
).select(sparkDF1['*']
,sparkDF2['c_id'].alias('c_id_address')
,sparkDF2['a_id'].alias('a_id_address')
,sparkDF2['created_date']
)
finalDF.show()
---- ---- -------- ---------- ----------- ------------ ------------ ------------
|c_id|a_id|order_id|order_date|order_share|c_id_address|a_id_address|created_date|
---- ---- -------- ---------- ----------- ------------ ------------ ------------
| c1| a3| 1|2021-01-23| 0.3| c1| a3| 2020-03-23|
| c1| a3| 3|2021-02-20| 0.4| c1| a3| 2020-03-23|
| c2| a5| 2|2021-03-20| 0.4| c2| a5| 2020-12-28|
|null|null| null| null| null| c2| a7| 2020-03-04|
| c1| a2| 1|2021-01-23| 0.2| c1| a2| 2020-04-23|
| c1| a2| 3|2021-02-20| 0.3| c1| a2| 2020-04-23|
| c1| a1| 1|2021-01-23| 0.5| c1| a1| 2020-12-31|
| c1| a1| 3|2021-02-20| 0.3| c1| a1| 2020-12-31|
|null|null| null| null| null| c1| a4| 2020-01-16|
| c2| a6| 2|2021-03-20| 0.6| c2| a6| 2020-05-16|
---- ---- -------- ---------- ----------- ------------ ------------ ------------
When isNull
finalDF = finalDF.withColumn('c_id',F.when(F.col('c_id').isNull()
,F.col('c_id_address')).otherwise(F.col('c_id'))
)\
.withColumn('a_id',F.when(F.col('a_id').isNull()
,F.col('a_id_address')).otherwise(F.col('a_id'))
)\
.withColumn('order_share',F.when(F.col('order_share').isNull()
,0.0).otherwise(F.col('order_share'))
)
finalDF.show()
---- ---- -------- ---------- ----------- ------------ ------------ ------------
|c_id|a_id|order_id|order_date|order_share|c_id_address|a_id_address|created_date|
---- ---- -------- ---------- ----------- ------------ ------------ ------------
| c1| a3| 1|2021-01-23| 0.3| c1| a3| 2020-03-23|
| c1| a3| 3|2021-02-20| 0.4| c1| a3| 2020-03-23|
| c2| a5| 2|2021-03-20| 0.4| c2| a5| 2020-12-28|
| c2| a7| null| null| 0.0| c2| a7| 2020-03-04|
| c1| a2| 1|2021-01-23| 0.2| c1| a2| 2020-04-23|
| c1| a2| 3|2021-02-20| 0.3| c1| a2| 2020-04-23|
| c1| a1| 1|2021-01-23| 0.5| c1| a1| 2020-12-31|
| c1| a1| 3|2021-02-20| 0.3| c1| a1| 2020-12-31|
| c1| a4| null| null| 0.0| c1| a4| 2020-01-16|
| c2| a6| 2|2021-03-20| 0.6| c2| a6| 2020-05-16|
---- ---- -------- ---------- ----------- ------------ ------------ ------------
Note - order_id
and order_date
are null as there is not value present in for the c_id
and a_id
combination in sparkDF2
This example is provide a approach , towards getting your required solution , you can further improvise if required to populate the order missing values
CodePudding user response:
You can use a intermediate orders
dataframe, created from df
dataframe and that contains only information about orders, which are columns customer_id
, order_id
and order_date
. Then you first inner join df_address
dataframe with this orders
dataframe, to link each couple (customer_id, address_id)
to orders-specific information, and then left join the resulting dataframe with df
dataframe to get order_share
per address, then replace null
value in order_share
column with 0.0
.
Here is the complete code:
from pyspark.sql import functions as F
# Orders dataframe that contains only orders-specific information
orders = df.select('customer_id', 'order_id', 'order_date').distinct()
df_address.join(orders, ['customer_id']) \ # link addresses with orders
.join(df.drop('order_date'), ['customer_id', 'address_id', 'order_id'], 'left_outer') \ # link orders/addresses with order shares
.withColumn('order_share', F.when(F.col('order_share').isNotNull(), F.col('order_share')).otherwise(F.lit(0.0))) \ # replace null in order_share column with 0.0
.orderBy('customer_id', 'order_id', 'address_id') \ # optional, to reorder dataframe