Home > Blockchain >  Pyspark Window function: Counting number of categorical variables and calculating percentages
Pyspark Window function: Counting number of categorical variables and calculating percentages

Time:09-23

I have the dataframe of the below format. There are different IDs, and product names and types associated for each product.

ID  Prod Name   Type    Total Qty
1   ABC             A   200
1   DEF             B   350
1   GEH             B   120
1   JIK             C   100
1   LMO             A   40
2   ABC             A   10
2   DEF             A   20
2   GEH             C   30
2   JIK             C   40
2   LMO             A   50

So I am trying to get the percentage of total A's, B's and C's for that product name and ID in a separate column. As a first step, I was trying to use window function, but it gave me the count of "A" across the whole column.

df.withColumn("count_cat", F.count("Type").over(Window.partitionBy("Type")))

But I need something like this

ID  total Products  Total Qty   % of A  % of B  % of C
1   5                 810        0.29    0.58    0.12

CodePudding user response:

Approach 1: Group By Aggregation

Based on your expected output, aggregates based on a GROUP BY Id would be sufficient.

You may achieve this using the following assuming your initial dataset is stored in a dataframe input_df

Using spark sql

  1. ensure your dataframe is accessible by creating a temporary view
input_df.createOrReplaceTempView("input_df")
  1. Running the sql below on your spark session
output_df = sparkSession.sql("""
SELECT
     ID,
     COUNT(Prod_Name) as `total products`,
     SUM(Total_Qty) as `Total Qty`,
     SUM(
         CASE WHEN Type='A' THEN Total_Qty END
     ) / SUM(Total_Qty) as `% of A`,
     SUM(
         CASE WHEN Type='B' THEN Total_Qty END
     ) / SUM(Total_Qty) as `% of B`,
     SUM(
         CASE WHEN Type='C' THEN Total_Qty END
     ) / SUM(Total_Qty) as `% of C`
FROM
    input_df
GROUP BY
    ID
""").na.fill(0)

Using the pyspark API

from pyspark.sql import functions as F

output_df = (
    input_df.groupBy("ID")
            .agg(
                F.count("Prod_Name").alias("total products"),
                F.sum("Total_Qty").alias("Total Qty"),
                (F.sum(
                    F.when(
                        F.col("Type")=="A",F.col("Total_Qty")
                    ).otherwise(0)
                ) / F.sum("Total_Qty")).alias("% of A"),
                (F.sum(
                    F.when(
                        F.col("Type")=="B",F.col("Total_Qty")
                    ).otherwise(0)
                )/ F.sum("Total_Qty")).alias("% of B"),
                (F.sum(
                    F.when(
                        F.col("Type")=="C",F.col("Total_Qty")
                    ).otherwise(0)
                )/ F.sum("Total_Qty")).alias("% of C")
            )
)

Approach 2: Using Windows

If it is that you would like to add these as 5 addition columns to your dataset you may use similar aggregations with the following window OVER (PARTITION BY ID) or Window.partitionBy("ID") as shown below

Using spark sql

  1. ensure your dataframe is accessible by creating a temporary view
input_df.createOrReplaceTempView("input_df")
  1. Running the sql below on your spark session
output_df = sparkSession.sql("""
SELECT
     *,
     COUNT(Prod_Name) OVER (PARTITION BY ID) as `total products`,
     SUM(Total_Qty) OVER (PARTITION BY ID) as `Total Qty`,
     SUM(
         CASE WHEN Type='A' THEN Total_Qty END
     ) OVER (PARTITION BY ID) / SUM(Total_Qty) OVER (PARTITION BY ID) as `% of A`,
     SUM(
         CASE WHEN Type='B' THEN Total_Qty END
     ) OVER (PARTITION BY ID)/ SUM(Total_Qty) OVER (PARTITION BY ID) as `% of B`,
     SUM(
         CASE WHEN Type='C' THEN Total_Qty END
     ) OVER (PARTITION BY ID) / SUM(Total_Qty) OVER (PARTITION BY ID) as `% of C`
FROM
    input_df
GROUP BY
    ID
""").na.fill(0)

Using the pyspark API

from pyspark.sql import functions as F
from pyspark.sql import Window

agg_window = Window.partitionBy("Id")

output_df = (
    input_df.withColumn(
                "total products",
                F.count("Prod_Name").over(agg_window)
            )
            .withColumn(
                "Total Qty",
                F.sum("Total_Qty").over(agg_window)
            )
            .withColumn(
                "% of A",
                F.sum(
                    F.when(
                        F.col("Type")=="A",F.col("Total_Qty")
                    ).otherwise(0)
                ).over(agg_window) / F.sum("Total_Qty").over(agg_window)
            )
            .withColumn(
                "% of B",
                F.sum(
                    F.when(
                        F.col("Type")=="B",F.col("Total_Qty")
                    ).otherwise(0)
                ).over(agg_window) / F.sum("Total_Qty").over(agg_window)
            )
            .withColumn(
                "% of C",
                F.sum(
                    F.when(
                        F.col("Type")=="C",F.col("Total_Qty")
                    ).otherwise(0)
                ).over(agg_window) / F.sum("Total_Qty").over(agg_window)
            )
)

Let me know if this works for you.

CodePudding user response:

One approach (without repeating A B C etc), is using pivot. The idea is grouping first then pivoting the type:

from pyspark.sql import functions as F
from pyspark.sql import Window as W

(df
    .groupBy('ID', 'Type')
    .agg(F.sum('Total Qty').alias('qty'))
    .withColumn('pct', F.col('qty') / F.sum('qty').over(W.partitionBy('ID')))
 
    .groupBy('ID')
    .pivot('Type')
    .agg(F.first('pct'))
    .show()
)

# Output
#  --- ------------------ ------------------ ------------------- 
# | ID|                 A|                 B|                  C|
#  --- ------------------ ------------------ ------------------- 
# |  1|0.2962962962962963|0.5802469135802469|0.12345679012345678|
# |  2|0.5333333333333333|              null| 0.4666666666666667|
#  --- ------------------ ------------------ ------------------- 
  • Related