Home > OS >  How to perform complex processing over each row in PySpark Dataframe
How to perform complex processing over each row in PySpark Dataframe

Time:09-25

I have a PySpark dataframe as follows

Customer_ID   Address_ID    Order_ID    Order_Date
 Cust_1       Addr_1            1       31-Dec-20
 Cust_1       Addr_1            2       23-Jan-21
 Cust_1       Addr_1            3       06-Feb-21
 Cust_1       Addr_2            4       13-Feb-21
 Cust_1       Addr_2            5       20-Feb-21
 Cust_1       Addr_3            6       18-Mar-21
 Cust_1       Addr_3            7       23-Mar-21
 Cust_2       Addr_4            8       31-Dec-20
 Cust_2       Addr_4            9       23-Jan-21
 Cust_2       Addr_4            10      06-Feb-21
 Cust_2       Addr_4            11      13-Feb-21
 Cust_2       Addr_4            12      20-Feb-21
 Cust_2       Addr_5            13      18-Mar-21
 Cust_2       Addr_5            14      23-Mar-21

The columns are customer id, address id, order id and date the order was placed respectively

The Order_ID is always unique

For every order (every row), I need to calculate the order share for a (customer c1,address a1) pair

Denoting order_share by ord_share(c1,a1) which is defined by the below formula,

The total number of orders between (Order_Date) and (Order_Date - 90 days) by c1 from a1  
----------------------------------------------------------------------------------------
The total number of orders between (Order_Date) and (Order_Date - 90 days) for all addresses by c1  

Note - underline in the above formula denotes division

90 days here is the window size.
An example from the above table : (I am leaving order_share as a fraction for ease of understanding)

For ORDER_ID 7:
The total number of orders are 7 - (by Cust_1)
ord_share(Cust_1,Addr_1) = 3/7, ord_share(Cust_1,Addr_2) = 2/7, ord_share(Cust_1,Addr_3) = 2/7

For ORDER_ID 6:
The total number of orders are 6 - (by Cust_1)
ord_share(Cust_1,Addr_1) = 3/6, ord_share(Cust_1,Addr_2) = 2/6, ord_share(Cust_1,Addr_3) = 1/6

For ORDER_ID 5:
The total number of orders are 5 - (by Cust_1)
ord_share(Cust_1,Addr_1) = 3, ord_share(Cust_1,Addr_2) = 2, ord_share(Cust_1,Addr_3) = 0

And so on... I need to store these for all the rows. My output format should be something like the following

(Is_original_address - this column refers to whether the Address_ID was the original address from which the order was placed)

Customer_ID   Address_ID    Order_ID    Order_Share  Is_original_address

Cust_1         Addr_1            7       3/7           0   1
Cust_1         Addr_2            7       2/7           0
Cust_1         Addr_3            7       2/7           1
Cust_1         Addr_1            6       3/6           0
Cust_1         Addr_2            6       2/6           0
Cust_1         Addr_3            6       1/6           1
Cust_1         Addr_1            5       3/5           0
Cust_1         Addr_2            5       2/5           1
Cust_1         Addr_3            5       0/5           0 
.
.
.
For all rows

So basically, each row in the input expands to multiple rows in the output depending on the number of address for the customer

Note - the columns in the initial dataframe are not in sorted order or grouped in any order, I just choose such an example to help with explanation

I am finding it very hard to go about this problem. I thought about it a lot, and I can't seem to think of any ways of joining/grouping data to do this since every row is kind of unique. I am really not sure how to get the output dataframe.
From what I can think, I have to clone the original dataframe and for each row, I'll probably have to do multiple group by or joins. I am really unsure how even start implementation.

Any help would be appreciated. Thanks!

Please do let me know if any other information is needed.

CodePudding user response:

As @Christophe commented, this uses window functions, but only to calculate the denominator

data=[
    ('c1','a1', 1,'2020-12-31'),
    ('c1','a1', 2,'2021-01-23'),
    ('c1','a1', 3,'2021-02-06'),
    ('c1','a2', 4,'2021-02-13'),
    ('c1','a2', 5,'2021-02-20'),
    ('c1','a3', 6,'2021-03-18'),
    ('c1','a3', 7,'2021-03-23'),
    ('c2','a4', 8,'2020-12-31'),
    ('c2','a4', 9,'2021-01-23'),
    ('c2','a4',10,'2021-02-06'),
    ('c2','a4',11,'2021-02-13'),
    ('c2','a4',12,'2021-02-20'),
    ('c2','a5',13,'2021-03-18'),
    ('c2','a5',14,'2021-03-23'),
]
df = spark.createDataFrame(data=data, schema = ['c_id','a_id','order_id','order_date'])
df=df.select('c_id','a_id','order_id',F.to_date(F.col('order_date')).alias('date'))
df.createOrReplaceTempView('orders')

spark.sql("""
WITH address_combinations AS (
  SELECT o1.order_id, o2.c_id, o2.a_id
  , CASE WHEN o1.a_id=o2.a_id THEN 1 ELSE 0 END AS is_original_address
  , COUNT(CASE WHEN DATEDIFF(o1.date, o2.date) BETWEEN 0 AND 90 THEN 1 END) AS num_orders
  FROM orders o1
  JOIN orders o2 ON o1.c_id=o2.c_id
  GROUP BY o1.order_id, o2.c_id, o2.a_id, is_original_address
)
SELECT c_id, a_id, order_id
, CONCAT(num_orders, '/', SUM(num_orders) OVER (PARTITION BY order_id)) AS order_share
, is_original_address
FROM address_combinations
ORDER BY order_id, a_id
""").show(200)
 ---- ---- -------- ----------- ------------------- 
|c_id|a_id|order_id|order_share|is_original_address|
 ---- ---- -------- ----------- ------------------- 
|  c1|  a1|       1|        1/1|                  1|
|  c1|  a2|       1|        0/1|                  0|
|  c1|  a3|       1|        0/1|                  0|
|  c1|  a1|       2|        2/2|                  1|
|  c1|  a2|       2|        0/2|                  0|
|  c1|  a3|       2|        0/2|                  0|
|  c1|  a1|       3|        3/3|                  1|
|  c1|  a2|       3|        0/3|                  0|
|  c1|  a3|       3|        0/3|                  0|
|  c1|  a1|       4|        3/4|                  0|
|  c1|  a2|       4|        1/4|                  1|
|  c1|  a3|       4|        0/4|                  0|
|  c1|  a1|       5|        3/5|                  0|
|  c1|  a2|       5|        2/5|                  1|
|  c1|  a3|       5|        0/5|                  0|
|  c1|  a1|       6|        3/6|                  0|
|  c1|  a2|       6|        2/6|                  0|
|  c1|  a3|       6|        1/6|                  1|
|  c1|  a1|       7|        3/7|                  0|
|  c1|  a2|       7|        2/7|                  0|
|  c1|  a3|       7|        2/7|                  1|
|  c2|  a4|       8|        1/1|                  1|
|  c2|  a5|       8|        0/1|                  0|
|  c2|  a4|       9|        2/2|                  1|
|  c2|  a5|       9|        0/2|                  0|
|  c2|  a4|      10|        3/3|                  1|
|  c2|  a5|      10|        0/3|                  0|
|  c2|  a4|      11|        4/4|                  1|
|  c2|  a5|      11|        0/4|                  0|
|  c2|  a4|      12|        5/5|                  1|
|  c2|  a5|      12|        0/5|                  0|
|  c2|  a4|      13|        5/6|                  0|
|  c2|  a5|      13|        1/6|                  1|
|  c2|  a4|      14|        5/7|                  0|
|  c2|  a5|      14|        2/7|                  1|
 ---- ---- -------- ----------- ------------------- 

Quick explanation: address_combinations first self-joins the orders table with itself to get all possible combinations. However, there may be duplicates, so we perform GROUP BY and COUNT the number of orders within the 90d time window.

The next part simply gives us the denominator and formats it as desired ("x/y")

Hope this meets your requirements!

  • Related