I have a PySpark dataframe as follows
Customer_ID Address_ID Order_ID Order_Date
Cust_1 Addr_1 1 31-Dec-20
Cust_1 Addr_1 2 23-Jan-21
Cust_1 Addr_1 3 06-Feb-21
Cust_1 Addr_2 4 13-Feb-21
Cust_1 Addr_2 5 20-Feb-21
Cust_1 Addr_3 6 18-Mar-21
Cust_1 Addr_3 7 23-Mar-21
Cust_2 Addr_4 8 31-Dec-20
Cust_2 Addr_4 9 23-Jan-21
Cust_2 Addr_4 10 06-Feb-21
Cust_2 Addr_4 11 13-Feb-21
Cust_2 Addr_4 12 20-Feb-21
Cust_2 Addr_5 13 18-Mar-21
Cust_2 Addr_5 14 23-Mar-21
The columns are customer id
, address id
, order id
and date the order was
placed respectively
The Order_ID
is always unique
For every order (every row), I need to calculate the order share for a (customer c1,address a1) pair
Denoting order_share by ord_share(c1,a1) which is defined by the below formula,
The total number of orders between (Order_Date) and (Order_Date - 90 days) by c1 from a1
----------------------------------------------------------------------------------------
The total number of orders between (Order_Date) and (Order_Date - 90 days) for all addresses by c1
Note - underline in the above formula denotes division
90 days here is the window size.
An example from the above table : (I am leaving order_share
as a fraction for ease of understanding)
For ORDER_ID 7:
The total number of orders are 7 - (by Cust_1)
ord_share(Cust_1,Addr_1) = 3/7, ord_share(Cust_1,Addr_2) = 2/7, ord_share(Cust_1,Addr_3) = 2/7
For ORDER_ID 6:
The total number of orders are 6 - (by Cust_1)
ord_share(Cust_1,Addr_1) = 3/6, ord_share(Cust_1,Addr_2) = 2/6, ord_share(Cust_1,Addr_3) = 1/6
For ORDER_ID 5:
The total number of orders are 5 - (by Cust_1)
ord_share(Cust_1,Addr_1) = 3, ord_share(Cust_1,Addr_2) = 2, ord_share(Cust_1,Addr_3) = 0
And so on... I need to store these for all the rows. My output format should be something like the following
(Is_original_address - this column refers to whether the Address_ID was the original address from which the order was placed)
Customer_ID Address_ID Order_ID Order_Share Is_original_address
Cust_1 Addr_1 7 3/7 0 1
Cust_1 Addr_2 7 2/7 0
Cust_1 Addr_3 7 2/7 1
Cust_1 Addr_1 6 3/6 0
Cust_1 Addr_2 6 2/6 0
Cust_1 Addr_3 6 1/6 1
Cust_1 Addr_1 5 3/5 0
Cust_1 Addr_2 5 2/5 1
Cust_1 Addr_3 5 0/5 0
.
.
.
For all rows
So basically, each row in the input expands to multiple rows in the output depending on the number of address for the customer
Note - the columns in the initial dataframe are not in sorted order or grouped in any order, I just choose such an example to help with explanation
I am finding it very hard to go about this problem. I thought about it a lot, and I can't seem to think of any ways of joining/grouping data to do this since every row is kind of unique. I am really not sure how to get the output dataframe.
From what I can think, I have to clone the original dataframe and for each row, I'll probably have to do multiple group by or joins. I am really unsure how even start implementation.
Any help would be appreciated. Thanks!
Please do let me know if any other information is needed.
CodePudding user response:
As @Christophe commented, this uses window functions, but only to calculate the denominator
data=[
('c1','a1', 1,'2020-12-31'),
('c1','a1', 2,'2021-01-23'),
('c1','a1', 3,'2021-02-06'),
('c1','a2', 4,'2021-02-13'),
('c1','a2', 5,'2021-02-20'),
('c1','a3', 6,'2021-03-18'),
('c1','a3', 7,'2021-03-23'),
('c2','a4', 8,'2020-12-31'),
('c2','a4', 9,'2021-01-23'),
('c2','a4',10,'2021-02-06'),
('c2','a4',11,'2021-02-13'),
('c2','a4',12,'2021-02-20'),
('c2','a5',13,'2021-03-18'),
('c2','a5',14,'2021-03-23'),
]
df = spark.createDataFrame(data=data, schema = ['c_id','a_id','order_id','order_date'])
df=df.select('c_id','a_id','order_id',F.to_date(F.col('order_date')).alias('date'))
df.createOrReplaceTempView('orders')
spark.sql("""
WITH address_combinations AS (
SELECT o1.order_id, o2.c_id, o2.a_id
, CASE WHEN o1.a_id=o2.a_id THEN 1 ELSE 0 END AS is_original_address
, COUNT(CASE WHEN DATEDIFF(o1.date, o2.date) BETWEEN 0 AND 90 THEN 1 END) AS num_orders
FROM orders o1
JOIN orders o2 ON o1.c_id=o2.c_id
GROUP BY o1.order_id, o2.c_id, o2.a_id, is_original_address
)
SELECT c_id, a_id, order_id
, CONCAT(num_orders, '/', SUM(num_orders) OVER (PARTITION BY order_id)) AS order_share
, is_original_address
FROM address_combinations
ORDER BY order_id, a_id
""").show(200)
---- ---- -------- ----------- -------------------
|c_id|a_id|order_id|order_share|is_original_address|
---- ---- -------- ----------- -------------------
| c1| a1| 1| 1/1| 1|
| c1| a2| 1| 0/1| 0|
| c1| a3| 1| 0/1| 0|
| c1| a1| 2| 2/2| 1|
| c1| a2| 2| 0/2| 0|
| c1| a3| 2| 0/2| 0|
| c1| a1| 3| 3/3| 1|
| c1| a2| 3| 0/3| 0|
| c1| a3| 3| 0/3| 0|
| c1| a1| 4| 3/4| 0|
| c1| a2| 4| 1/4| 1|
| c1| a3| 4| 0/4| 0|
| c1| a1| 5| 3/5| 0|
| c1| a2| 5| 2/5| 1|
| c1| a3| 5| 0/5| 0|
| c1| a1| 6| 3/6| 0|
| c1| a2| 6| 2/6| 0|
| c1| a3| 6| 1/6| 1|
| c1| a1| 7| 3/7| 0|
| c1| a2| 7| 2/7| 0|
| c1| a3| 7| 2/7| 1|
| c2| a4| 8| 1/1| 1|
| c2| a5| 8| 0/1| 0|
| c2| a4| 9| 2/2| 1|
| c2| a5| 9| 0/2| 0|
| c2| a4| 10| 3/3| 1|
| c2| a5| 10| 0/3| 0|
| c2| a4| 11| 4/4| 1|
| c2| a5| 11| 0/4| 0|
| c2| a4| 12| 5/5| 1|
| c2| a5| 12| 0/5| 0|
| c2| a4| 13| 5/6| 0|
| c2| a5| 13| 1/6| 1|
| c2| a4| 14| 5/7| 0|
| c2| a5| 14| 2/7| 1|
---- ---- -------- ----------- -------------------
Quick explanation:
address_combinations
first self-joins the orders table with itself to get all possible combinations. However, there may be duplicates, so we perform GROUP BY
and COUNT
the number of orders within the 90d time window.
The next part simply gives us the denominator and formats it as desired ("x/y")
Hope this meets your requirements!