Home > database >  Count number of orders by a cutomer with amount less than the present order value in the last two or
Count number of orders by a cutomer with amount less than the present order value in the last two or

Time:07-06

Consider that I have a table

Customer Day Amount
A 4 96
A 22 63
A 32 19
A 50 27
A 57 99
A 72 93
B 69 97
B 82 22
B 87 64
C 22 60
C 30 22
C 48 74
C 49 68
C 55 11
C 85 79

I need to calculate the number of orders for every customer whose value is less than the order value of the present order among his past two orders, i.e

Customer Day Amount Count
A 4 96 0
A 22 63 0
A 32 19 0
A 50 27 1
A 57 99 2
A 72 93 1
B 69 97 0
B 82 22 0
B 87 64 1
C 22 60 0
C 30 22 0
C 48 74 2
C 49 68 1
C 55 11 0
C 85 79 2

CodePudding user response:

Since you are only PRECEEDING 2 ROWS , you can pre-compute the LAG values and utilise a CASE statement to generate the required logic.

Else if you want it to be more dynamic and flexible in nature you can utilise Shubham Sharma's answer

Data Preparation

s = StringIO("""
Customer    Day Amount
A   4   96
A   22  63
A   32  19
A   50  27
A   57  99
A   72  93
B   69  97
B   82  22
B   87  64
C   22  60
C   30  22
C   48  74
C   49  68
C   55  11
C   85  79
"""
)

df = pd.read_csv(s,delimiter='\t')

sparkDF = sql.createDataFrame(df)

sparkDF.show()

 -------- --- ------ 
|Customer|Day|Amount|
 -------- --- ------ 
|       A|  4|    96|
|       A| 22|    63|
|       A| 32|    19|
|       A| 50|    27|
|       A| 57|    99|
|       A| 72|    93|
|       B| 69|    97|
|       B| 82|    22|
|       B| 87|    64|
|       C| 22|    60|
|       C| 30|    22|
|       C| 48|    74|
|       C| 49|    68|
|       C| 55|    11|
|       C| 85|    79|
 -------- --- ------ 

SparkSQL

sparkDF.registerTempTable("TB1")

sql.sql("""
SELECT
    CUSTOMER,
    DAY,
    AMOUNT,
    CASE 
        WHEN AMOUNT > LAG_1 AND AMOUNT > LAG_2 THEN 2
        WHEN AMOUNT > LAG_1 OR AMOUNT > LAG_2 THEN 1
        ELSE 0
    END as Count
FROM (
    SELECT
        CUSTOMER,
        DAY,
        AMOUNT,
        LAG(AMOUNT,1) OVER(PARTITION BY CUSTOMER ORDER BY DAY) as lag_1,
        LAG(AMOUNT,2) OVER(PARTITION BY CUSTOMER ORDER BY DAY) as lag_2
    FROM TB1
)
ORDER BY 1,2
;
""").show()

 -------- --- ------ ----- 
|CUSTOMER|DAY|AMOUNT|Count|
 -------- --- ------ ----- 
|       A|  4|    96|    0|
|       A| 22|    63|    0|
|       A| 32|    19|    0|
|       A| 50|    27|    1|
|       A| 57|    99|    2|
|       A| 72|    93|    1|
|       B| 69|    97|    0|
|       B| 82|    22|    0|
|       B| 87|    64|    1|
|       C| 22|    60|    0|
|       C| 30|    22|    0|
|       C| 48|    74|    2|
|       C| 49|    68|    1|
|       C| 55|    11|    0|
|       C| 85|    79|    2|
 -------- --- ------ ----- 

CodePudding user response:

We can do this with when otherwise and lag

from pyspark.sql import functions as F
from pyspark.sql import Window
schema="Customer string,day int,Amount int"

data=[('A',4,96),('A',22,63),('A',32,19),('A',50,27),('A',57,99),('A',72,93),('B',69,97),('B',82,22),('B',87,64),('C',22,60),('C',30,22),('C',48,74),('C',49,68),('C',55,11),('C',85,79)]
dql=spark.createDataFrame(data,schema)
dql.withColumn("count",when((col('Amount')>F.lag("Amount",2).over(Window.partitionBy("Customer").orderBy("day"))) & (col('Amount')>F.lag("Amount",1).over(Window.partitionBy("Customer").orderBy("day"))) ,2).when((col('Amount')>F.lag("Amount",2).over(Window.partitionBy("Customer").orderBy("day"))) | (col('Amount')>F.lag("Amount",1).over(Window.partitionBy("Customer").orderBy("day"))) ,1).otherwise("0")).show()

#output
 -------- --- ------ ----- 
|Customer|day|Amount|count|
 -------- --- ------ ----- 
|       A|  4|    96|    0|
|       A| 22|    63|    0|
|       A| 32|    19|    0|
|       A| 50|    27|    1|
|       A| 57|    99|    2|
|       A| 72|    93|    1|
|       B| 69|    97|    0|
|       B| 82|    22|    0|
|       B| 87|    64|    1|
|       C| 22|    60|    0|
|       C| 30|    22|    0|
|       C| 48|    74|    2|
|       C| 49|    68|    1|
|       C| 55|    11|    0|
|       C| 85|    79|    2|
 -------- --- ------ ----- 

CodePudding user response:

Let us use collect_list over Window to collect the amounts corresponding to past two orders, then aggregate to count the values in list which are less than Amount in current row

w = Window.partitionBy('Customer').orderBy('Day').rowsBetween(-2, -1)

df.withColumn(
    'Count',
    F.aggregate(
        F.collect_list('Amount').over(w), F.lit(0),
        lambda acc, x: acc   (x < F.col('Amount')).cast('int')
    )
)

 -------- --- ------ ----- 
|Customer|Day|Amount|Count|
 -------- --- ------ ----- 
|       A|  4|    96|    0|
|       A| 22|    63|    0|
|       A| 32|    19|    0|
|       A| 50|    27|    1|
|       A| 57|    99|    2|
|       A| 72|    93|    1|
|       B| 69|    97|    0|
|       B| 82|    22|    0|
|       B| 87|    64|    1|
|       C| 22|    60|    0|
|       C| 30|    22|    0|
|       C| 48|    74|    2|
|       C| 49|    68|    1|
|       C| 55|    11|    0|
|       C| 85|    79|    2|
 -------- --- ------ ----- 
  • Related