I have a dataframe with Customer_ID and Invoice_date and I want to convert each customer into either Active, New, Loss or Lapsed category. The data is present from July 2021 to June 2022 (12 months_) The criteria for each split is as:
- Active customer = Customer present once in (Apr, May, Jun 22) & once in (Jul 21 to Mar 22)
- New customer = Customer present just for (Apr, May, Jun 22) and no other month
- Lapsed customer = Customer present just for (Jan, Feb, Mar 22) and not for (Apr, May, Jun 22)
- Lost customer = Customer present from (Jul to Dec 21) and not for (Jan to Jun 22)
So far I have tried to create a function using the below code
max_date = F.max(more_cust.INVOICE_DATE)
two_months = F.date_sub(more_cust.INVOICE_DATE, 60)
three_months = F.date_sub(more_cust.INVOICE_DATE, 90)
six_months = F.date_sub(more_cust.INVOICE_DATE, 180)
one_year = F.date_sub(more_cust.INVOICE_DATE, 360)
def recency_bucket(df1):
customer = dict()
df1 = df1.sort("INVOICE_DATE", ascending=False)
var_date = df1.rdd.map(lambda x: x.INVOICE_DATE).collect()
cust_list = df1.rdd.map(lambda x: x.CUST_ID).collect()
customer = customer.withColumn("CUST_ID", df1.collect[0]["cust_list"])
I want the output to look like this:
CodePudding user response:
You can categorise your invoice date in quarters say 1(jul to sep 21), 2(oct to dec 21), 3(jan to march 22), 4(april to june 22).
Invoice data
cust_id invoice_date
c1 2021-07-05
c2 2022-02-01
c2 2022-05-10
c3 2022-02-01
c4 2022-04-10
Invoice data with quarter
df = df.withColumn("quarter", F.quarter("invoice_date")).withColumn("quarter", F.when((F.col("quarter") 2) > 4,
(F.col("quarter") 2) % 4).otherwise(F.col("quarter") 2))
------- ------------ -------
|cust_id|invoice_date|quarter|
------- ------------ -------
| c1| 2021-07-05| 1|
| c2| 2022-02-01| 3|
| c2| 2022-05-10| 4|
| c3| 2022-02-01| 3|
| c4| 2022-04-10| 4|
------- ------------ -------
Create pivot table and define rules based on bucket criteria and categorise customers
cust_quarter = df.groupBy("cust_id").pivot("quarter", [1,2,3,4]).count().fillna(0)
cust_quarter.show()
------- --- --- --- ---
|cust_id| 1| 2| 3| 4|
------- --- --- --- ---
| c1| 1| 0| 0| 0|
| c4| 0| 0| 0| 1|
| c3| 0| 0| 1| 0|
| c2| 0| 0| 1| 1|
------- --- --- --- ---
new = ((F.col("4") > 0) & (F.col("1") F.col("2") F.col("3") == 0))
active = ((F.col("4") > 0) & (F.col("1") F.col("2") F.col("3") > 0))
loss = ((F.col("1") F.col("2") > 0) & (F.col("3") F.col("4") == 0))
lapsed = ((F.col("3") > 0) & (F.col("1") F.col("2") F.col("4") == 0))
bucket_rules = F.when(new, "new").when(active, "acitve").when(loss, "loss").when(lapsed, "lapsed")
cust_quarter = cust_quarter.withColumn("bucket", bucket_rules)
cust_quarter.show()
------- --- --- --- --- ------
|cust_id| 1| 2| 3| 4|bucket|
------- --- --- --- --- ------
| c1| 1| 0| 0| 0| loss|
| c4| 0| 0| 0| 1| new|
| c3| 0| 0| 1| 0|lapsed|
| c2| 0| 0| 1| 1|acitve|
------- --- --- --- --- ------