Consider that I have a table
Customer | Day | Amount |
---|---|---|
A | 4 | 96 |
A | 22 | 63 |
A | 32 | 19 |
A | 50 | 27 |
A | 57 | 99 |
A | 72 | 93 |
B | 69 | 97 |
B | 82 | 22 |
B | 87 | 64 |
C | 22 | 60 |
C | 30 | 22 |
C | 48 | 74 |
C | 49 | 68 |
C | 55 | 11 |
C | 85 | 79 |
I need to calculate the number of orders for every customer whose value is less than the order value of the present order among his past two orders, i.e
Customer | Day | Amount | Count |
---|---|---|---|
A | 4 | 96 | 0 |
A | 22 | 63 | 0 |
A | 32 | 19 | 0 |
A | 50 | 27 | 1 |
A | 57 | 99 | 2 |
A | 72 | 93 | 1 |
B | 69 | 97 | 0 |
B | 82 | 22 | 0 |
B | 87 | 64 | 1 |
C | 22 | 60 | 0 |
C | 30 | 22 | 0 |
C | 48 | 74 | 2 |
C | 49 | 68 | 1 |
C | 55 | 11 | 0 |
C | 85 | 79 | 2 |
CodePudding user response:
Since you are only PRECEEDING
2 ROWS , you can pre-compute the LAG
values and utilise a CASE
statement to generate the required logic.
Else if you want it to be more dynamic and flexible in nature you can utilise Shubham Sharma's answer
Data Preparation
s = StringIO("""
Customer Day Amount
A 4 96
A 22 63
A 32 19
A 50 27
A 57 99
A 72 93
B 69 97
B 82 22
B 87 64
C 22 60
C 30 22
C 48 74
C 49 68
C 55 11
C 85 79
"""
)
df = pd.read_csv(s,delimiter='\t')
sparkDF = sql.createDataFrame(df)
sparkDF.show()
-------- --- ------
|Customer|Day|Amount|
-------- --- ------
| A| 4| 96|
| A| 22| 63|
| A| 32| 19|
| A| 50| 27|
| A| 57| 99|
| A| 72| 93|
| B| 69| 97|
| B| 82| 22|
| B| 87| 64|
| C| 22| 60|
| C| 30| 22|
| C| 48| 74|
| C| 49| 68|
| C| 55| 11|
| C| 85| 79|
-------- --- ------
SparkSQL
sparkDF.registerTempTable("TB1")
sql.sql("""
SELECT
CUSTOMER,
DAY,
AMOUNT,
CASE
WHEN AMOUNT > LAG_1 AND AMOUNT > LAG_2 THEN 2
WHEN AMOUNT > LAG_1 OR AMOUNT > LAG_2 THEN 1
ELSE 0
END as Count
FROM (
SELECT
CUSTOMER,
DAY,
AMOUNT,
LAG(AMOUNT,1) OVER(PARTITION BY CUSTOMER ORDER BY DAY) as lag_1,
LAG(AMOUNT,2) OVER(PARTITION BY CUSTOMER ORDER BY DAY) as lag_2
FROM TB1
)
ORDER BY 1,2
;
""").show()
-------- --- ------ -----
|CUSTOMER|DAY|AMOUNT|Count|
-------- --- ------ -----
| A| 4| 96| 0|
| A| 22| 63| 0|
| A| 32| 19| 0|
| A| 50| 27| 1|
| A| 57| 99| 2|
| A| 72| 93| 1|
| B| 69| 97| 0|
| B| 82| 22| 0|
| B| 87| 64| 1|
| C| 22| 60| 0|
| C| 30| 22| 0|
| C| 48| 74| 2|
| C| 49| 68| 1|
| C| 55| 11| 0|
| C| 85| 79| 2|
-------- --- ------ -----
CodePudding user response:
We can do this with when otherwise and lag
from pyspark.sql import functions as F
from pyspark.sql import Window
schema="Customer string,day int,Amount int"
data=[('A',4,96),('A',22,63),('A',32,19),('A',50,27),('A',57,99),('A',72,93),('B',69,97),('B',82,22),('B',87,64),('C',22,60),('C',30,22),('C',48,74),('C',49,68),('C',55,11),('C',85,79)]
dql=spark.createDataFrame(data,schema)
dql.withColumn("count",when((col('Amount')>F.lag("Amount",2).over(Window.partitionBy("Customer").orderBy("day"))) & (col('Amount')>F.lag("Amount",1).over(Window.partitionBy("Customer").orderBy("day"))) ,2).when((col('Amount')>F.lag("Amount",2).over(Window.partitionBy("Customer").orderBy("day"))) | (col('Amount')>F.lag("Amount",1).over(Window.partitionBy("Customer").orderBy("day"))) ,1).otherwise("0")).show()
#output
-------- --- ------ -----
|Customer|day|Amount|count|
-------- --- ------ -----
| A| 4| 96| 0|
| A| 22| 63| 0|
| A| 32| 19| 0|
| A| 50| 27| 1|
| A| 57| 99| 2|
| A| 72| 93| 1|
| B| 69| 97| 0|
| B| 82| 22| 0|
| B| 87| 64| 1|
| C| 22| 60| 0|
| C| 30| 22| 0|
| C| 48| 74| 2|
| C| 49| 68| 1|
| C| 55| 11| 0|
| C| 85| 79| 2|
-------- --- ------ -----
CodePudding user response:
Let us use collect_list
over Window
to collect the amounts corresponding to past two orders, then aggregate
to count the values in list which are less than Amount
in current row
w = Window.partitionBy('Customer').orderBy('Day').rowsBetween(-2, -1)
df.withColumn(
'Count',
F.aggregate(
F.collect_list('Amount').over(w), F.lit(0),
lambda acc, x: acc (x < F.col('Amount')).cast('int')
)
)
-------- --- ------ -----
|Customer|Day|Amount|Count|
-------- --- ------ -----
| A| 4| 96| 0|
| A| 22| 63| 0|
| A| 32| 19| 0|
| A| 50| 27| 1|
| A| 57| 99| 2|
| A| 72| 93| 1|
| B| 69| 97| 0|
| B| 82| 22| 0|
| B| 87| 64| 1|
| C| 22| 60| 0|
| C| 30| 22| 0|
| C| 48| 74| 2|
| C| 49| 68| 1|
| C| 55| 11| 0|
| C| 85| 79| 2|
-------- --- ------ -----