I have the dataframe of the below format. There are different IDs, and product names and types associated for each product.
ID Prod Name Type Total Qty
1 ABC A 200
1 DEF B 350
1 GEH B 120
1 JIK C 100
1 LMO A 40
2 ABC A 10
2 DEF A 20
2 GEH C 30
2 JIK C 40
2 LMO A 50
So I am trying to get the percentage of total A's, B's and C's for that product name and ID in a separate column. As a first step, I was trying to use window function, but it gave me the count of "A" across the whole column.
df.withColumn("count_cat", F.count("Type").over(Window.partitionBy("Type")))
But I need something like this
ID total Products Total Qty % of A % of B % of C
1 5 810 0.29 0.58 0.12
CodePudding user response:
Approach 1: Group By Aggregation
Based on your expected output, aggregates based on a GROUP BY Id
would be sufficient.
You may achieve this using the following assuming your initial dataset is stored in a dataframe input_df
Using spark sql
- ensure your dataframe is accessible by creating a temporary view
input_df.createOrReplaceTempView("input_df")
- Running the sql below on your spark session
output_df = sparkSession.sql("""
SELECT
ID,
COUNT(Prod_Name) as `total products`,
SUM(Total_Qty) as `Total Qty`,
SUM(
CASE WHEN Type='A' THEN Total_Qty END
) / SUM(Total_Qty) as `% of A`,
SUM(
CASE WHEN Type='B' THEN Total_Qty END
) / SUM(Total_Qty) as `% of B`,
SUM(
CASE WHEN Type='C' THEN Total_Qty END
) / SUM(Total_Qty) as `% of C`
FROM
input_df
GROUP BY
ID
""").na.fill(0)
Using the pyspark API
from pyspark.sql import functions as F
output_df = (
input_df.groupBy("ID")
.agg(
F.count("Prod_Name").alias("total products"),
F.sum("Total_Qty").alias("Total Qty"),
(F.sum(
F.when(
F.col("Type")=="A",F.col("Total_Qty")
).otherwise(0)
) / F.sum("Total_Qty")).alias("% of A"),
(F.sum(
F.when(
F.col("Type")=="B",F.col("Total_Qty")
).otherwise(0)
)/ F.sum("Total_Qty")).alias("% of B"),
(F.sum(
F.when(
F.col("Type")=="C",F.col("Total_Qty")
).otherwise(0)
)/ F.sum("Total_Qty")).alias("% of C")
)
)
Approach 2: Using Windows
If it is that you would like to add these as 5 addition columns to your dataset you may use similar aggregations with the following window OVER (PARTITION BY ID)
or Window.partitionBy("ID")
as shown below
Using spark sql
- ensure your dataframe is accessible by creating a temporary view
input_df.createOrReplaceTempView("input_df")
- Running the sql below on your spark session
output_df = sparkSession.sql("""
SELECT
*,
COUNT(Prod_Name) OVER (PARTITION BY ID) as `total products`,
SUM(Total_Qty) OVER (PARTITION BY ID) as `Total Qty`,
SUM(
CASE WHEN Type='A' THEN Total_Qty END
) OVER (PARTITION BY ID) / SUM(Total_Qty) OVER (PARTITION BY ID) as `% of A`,
SUM(
CASE WHEN Type='B' THEN Total_Qty END
) OVER (PARTITION BY ID)/ SUM(Total_Qty) OVER (PARTITION BY ID) as `% of B`,
SUM(
CASE WHEN Type='C' THEN Total_Qty END
) OVER (PARTITION BY ID) / SUM(Total_Qty) OVER (PARTITION BY ID) as `% of C`
FROM
input_df
GROUP BY
ID
""").na.fill(0)
Using the pyspark API
from pyspark.sql import functions as F
from pyspark.sql import Window
agg_window = Window.partitionBy("Id")
output_df = (
input_df.withColumn(
"total products",
F.count("Prod_Name").over(agg_window)
)
.withColumn(
"Total Qty",
F.sum("Total_Qty").over(agg_window)
)
.withColumn(
"% of A",
F.sum(
F.when(
F.col("Type")=="A",F.col("Total_Qty")
).otherwise(0)
).over(agg_window) / F.sum("Total_Qty").over(agg_window)
)
.withColumn(
"% of B",
F.sum(
F.when(
F.col("Type")=="B",F.col("Total_Qty")
).otherwise(0)
).over(agg_window) / F.sum("Total_Qty").over(agg_window)
)
.withColumn(
"% of C",
F.sum(
F.when(
F.col("Type")=="C",F.col("Total_Qty")
).otherwise(0)
).over(agg_window) / F.sum("Total_Qty").over(agg_window)
)
)
Let me know if this works for you.
CodePudding user response:
One approach (without repeating A B C etc), is using pivot. The idea is grouping first then pivoting the type:
from pyspark.sql import functions as F
from pyspark.sql import Window as W
(df
.groupBy('ID', 'Type')
.agg(F.sum('Total Qty').alias('qty'))
.withColumn('pct', F.col('qty') / F.sum('qty').over(W.partitionBy('ID')))
.groupBy('ID')
.pivot('Type')
.agg(F.first('pct'))
.show()
)
# Output
# --- ------------------ ------------------ -------------------
# | ID| A| B| C|
# --- ------------------ ------------------ -------------------
# | 1|0.2962962962962963|0.5802469135802469|0.12345679012345678|
# | 2|0.5333333333333333| null| 0.4666666666666667|
# --- ------------------ ------------------ -------------------