I am using an EMR notebook with Pyspark >= 3.1
I have 4 columns:
- ID_CLIENT: a unique index for the client
- IDX_TRX: a unique index for the transaction. Actually this is an alphanumeric column and the index does not indicate any order. The numerical index in this example is for easy explanation.
- dt: datetime, date of the transaction
- AVERAGE_TRX: Amount of the transaction
I want to calculate the following column 'AVERAGE_TRX':
--------- -------- ---------- ----- ------------
|ID_CLIENT| IDX_TRX| dt|AMOUNT|AVERAGE_TRX|
--------- -------- ---------- ----- ------------
| A| 01|2018-06-14| 10| NULL| # 1st trx there are no records
| B| 01|2018-06-14| 5| NULL| # 1st trx there are no records
| A| 02|2018-06-15| 20| 10| # 10 / 1
| A| 03|2018-06-15| 30| 15| # (10 20) / 2
| B| 02|2018-06-16| 10| 5| # 5 / 1
| A| 04|2018-06-16| 20| 20| # (10 20 30) / 3
| A| 05|2018-06-17| 5| 20| # (10 20 30 20) / 4
| B| 03|2018-06-17| 10| 7.5| # (5 10) / 2
| A| 06|2018-06-18| 15| 17| # (10 20 30 20 5) / 5
| B| 04|2018-06-18| 10| 8.3333334| # (5 10 10) / 3
--------- -------- ---------- ----- ------------
how can I calculate the last column?
Thank you in advance.
CodePudding user response:
Try this - This will mess up the order of records but it should have desired value for rows.
from pyspark.sql import Window
w = (Window.partitionBy('ID_CLIENT')
.orderBy('IDX_TRX')
.rowsBetween(Window.unboundedPreceding, -1)) # -1 is to do cumulative calculation up until the previous row.
df = df.withColumn('AVERAGE_TRX', avg('AMOUNT').over(w))
df.show()
--------- -------- ---------- ----- ------------
|ID_CLIENT| IDX_TRX| dt|AMOUNT|AVERAGE_TRX|
--------- -------- ---------- ----- ------------
| A| 01|2018-06-14| 10| null|
| A| 02|2018-06-15| 20| 10.0|
| A| 03|2018-06-15| 30| 15.0|
| A| 04|2018-06-16| 20| 20.0|
| A| 05|2018-06-17| 5| 20.0|
| A| 06|2018-06-18| 15| 17.0|
| B| 01|2018-06-14| 5| null|
| B| 02|2018-06-16| 10| 5.0|
| B| 03|2018-06-17| 10| 7.5|
| B| 04|2018-06-18| 10| 8.3333334|
--------- -------- ---------- ----- ------------