beginner to Pyspark and trying to calculate year over year percentage change w.r.t to product count by grouping product.
I've got this data frame
prod_desc year prod_count
0 product_a 2019 53
1 product_b 2019 44
2 product_c 2019 36
3 product_a 2020 52
4 product_b 2020 43
5 product_c 2020 42
required output:
prod_desc year prod_count Percentage change
0 product_a 2019 53 NaN
1 product_b 2019 44 NaN
2 product_c 2019 36 NaN
3 product_a 2020 52 -1.88
4 product_b 2020 43 -2.27
5 product_c 2020 42 16.60
I am able to do this using below logic in python pandas need to achieve same using pyspark
ds['percentage change'] = ds.sort_values('year').groupby(['prod_desc']).agg({'prod_count':'pct_change'})
Any help would be much appreciated
CodePudding user response:
Here's an example using Window
with a lag
to keep track of the previous value and to calculate the percentage change:
from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as F
spark = SparkSession.builder.getOrCreate()
data = [
{"prod_desc": "product_a", "year": 2019, "prod_count": 53},
{"prod_desc": "product_b", "year": 2019, "prod_count": 44},
{"prod_desc": "product_c", "year": 2019, "prod_count": 36},
{"prod_desc": "product_a", "year": 2020, "prod_count": 52},
{"prod_desc": "product_b", "year": 2020, "prod_count": 43},
{"prod_desc": "product_c", "year": 2020, "prod_count": 42},
]
df = spark.createDataFrame(data)
window = Window.partitionBy("prod_desc").orderBy("year")
df = df.withColumn("prev_value", F.lag(df.prod_count).over(window))
df = (
df.withColumn(
"Percentage change",
F.when(F.isnull(df.prod_count - df.prev_value), None).otherwise(
(df.prod_count - df.prev_value) * 100 / df.prev_value
),
)
.drop("prev_value")
.orderBy("year", "prod_desc")
)
Result:
---------- --------- ---- -------------------
|prod_count|prod_desc|year|Percentage change |
---------- --------- ---- -------------------
|53 |product_a|2019|null |
|44 |product_b|2019|null |
|36 |product_c|2019|null |
|52 |product_a|2020|-1.8867924528301887|
|43 |product_b|2020|-2.272727272727273 |
|42 |product_c|2020|16.666666666666668 |
---------- --------- ---- -------------------
CodePudding user response:
Your DF:
df = spark.createDataFrame(
[
(0, 'product_a', 2019, 53)
,(1, 'product_b', 2019, 44)
,(2, 'product_c', 2019, 36)
,(3, 'product_a', 2020, 52)
,(4, 'product_b', 2020, 43)
,(5, 'product_c', 2020, 42)
], ['id', 'prod_desc', 'year', 'prod_count']
)
You may use a window function with a lag function:
from pyspark.sql.window import Window
import pyspark.sql.functions as F
W = Window.partitionBy("prod_desc").orderBy('year')
df.withColumn('prod_count_y-1',F.lag(df['prod_count']).over(W))\
.withColumn('var %', F.round((F.col('prod_count')/F.col('prod_count_y-1') -1)*100,2))\
.show()
--- --------- ---- ---------- -------------- -----
| id|prod_desc|year|prod_count|prod_count_y-1|var %|
--- --------- ---- ---------- -------------- -----
| 0|product_a|2019| 53| null| null|
| 3|product_a|2020| 52| 53|-1.89|
| 1|product_b|2019| 44| null| null|
| 4|product_b|2020| 43| 44|-2.27|
| 2|product_c|2019| 36| null| null|
| 5|product_c|2020| 42| 36|16.67|
--- --------- ---- ---------- -------------- -----