Home > Blockchain >  How to get percent change year over year by group with PySpark
How to get percent change year over year by group with PySpark

Time:11-26

beginner to Pyspark and trying to calculate year over year percentage change w.r.t to product count by grouping product.

I've got this data frame

    prod_desc  year  prod_count
0  product_a  2019          53
1  product_b  2019          44
2  product_c  2019          36
3  product_a  2020          52
4  product_b  2020          43
5  product_c  2020          42

required output:

     prod_desc  year  prod_count  Percentage change
0  product_a  2019          53                NaN
1  product_b  2019          44                NaN
2  product_c  2019          36                NaN
3  product_a  2020          52              -1.88
4  product_b  2020          43              -2.27
5  product_c  2020          42              16.60

I am able to do this using below logic in python pandas need to achieve same using pyspark

ds['percentage change'] = ds.sort_values('year').groupby(['prod_desc']).agg({'prod_count':'pct_change'})

Any help would be much appreciated

CodePudding user response:

Here's an example using Window with a lag to keep track of the previous value and to calculate the percentage change:

from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()
data = [
    {"prod_desc": "product_a", "year": 2019, "prod_count": 53},
    {"prod_desc": "product_b", "year": 2019, "prod_count": 44},
    {"prod_desc": "product_c", "year": 2019, "prod_count": 36},
    {"prod_desc": "product_a", "year": 2020, "prod_count": 52},
    {"prod_desc": "product_b", "year": 2020, "prod_count": 43},
    {"prod_desc": "product_c", "year": 2020, "prod_count": 42},
]

df = spark.createDataFrame(data)
window = Window.partitionBy("prod_desc").orderBy("year")
df = df.withColumn("prev_value", F.lag(df.prod_count).over(window))
df = (
    df.withColumn(
        "Percentage change",
        F.when(F.isnull(df.prod_count - df.prev_value), None).otherwise(
            (df.prod_count - df.prev_value) * 100 / df.prev_value
        ),
    )
    .drop("prev_value")
    .orderBy("year", "prod_desc")
)

Result:

 ---------- --------- ---- ------------------- 
|prod_count|prod_desc|year|Percentage change  |
 ---------- --------- ---- ------------------- 
|53        |product_a|2019|null               |
|44        |product_b|2019|null               |
|36        |product_c|2019|null               |
|52        |product_a|2020|-1.8867924528301887|
|43        |product_b|2020|-2.272727272727273 |
|42        |product_c|2020|16.666666666666668 |
 ---------- --------- ---- ------------------- 

CodePudding user response:

Your DF:

df = spark.createDataFrame(
  [
     (0, 'product_a', 2019, 53)
    ,(1, 'product_b', 2019, 44)
    ,(2, 'product_c', 2019, 36)
    ,(3, 'product_a', 2020, 52)
    ,(4, 'product_b', 2020, 43)
    ,(5, 'product_c', 2020, 42)
  ], ['id', 'prod_desc', 'year', 'prod_count']
)

You may use a window function with a lag function:

from pyspark.sql.window import Window
import pyspark.sql.functions as F

W = Window.partitionBy("prod_desc").orderBy('year')

df.withColumn('prod_count_y-1',F.lag(df['prod_count']).over(W))\
  .withColumn('var %', F.round((F.col('prod_count')/F.col('prod_count_y-1') -1)*100,2))\
  .show()

 --- --------- ---- ---------- -------------- ----- 
| id|prod_desc|year|prod_count|prod_count_y-1|var %|
 --- --------- ---- ---------- -------------- ----- 
|  0|product_a|2019|        53|          null| null|
|  3|product_a|2020|        52|            53|-1.89|
|  1|product_b|2019|        44|          null| null|
|  4|product_b|2020|        43|            44|-2.27|
|  2|product_c|2019|        36|          null| null|
|  5|product_c|2020|        42|            36|16.67|
 --- --------- ---- ---------- -------------- ----- 
  • Related