Home > Software design >  Finding max,min,avg and add new columns with results into a dataset in Spark
Finding max,min,avg and add new columns with results into a dataset in Spark

Time:07-25

I'm, newbie with Spark have this exercise to solve with the following tables and i'm using spark 3.3.0

list of warehouse positions (positionId, warehouse, product, eventTime)

positionId warehouse product eventTime
1 W-1 P-1 1528463098
2 W-1 P-2 1528463100
3 W-2 P-3 1528463110
4 W-2 P-4 1528463111

list of amounts (positionId, amount, eventTime)

positionId amount eventTime
1 10.00 1528463098
1 10.20 1528463008
2 5.00 1528463100
3 4.90 1528463100
3 5.50 1528463111
3 5.00 1528463105
4 99.99 1528463111
4 99.57 1528463112

i need to find max, min, avg amounts for each warehouse and product and place the result in three new columns.

W-1, P-1, <max?(ex10.20)>, <min?(ex10.00)>, <avg?>

…..

W-2, P-4, <max?>, <min?>, <avg?>

so far i joined the two tables on positionId and dropped the columns that i think i won't need, such a positionId and eventTime for both.

val all = amounts.join(whp, whp("positionId") === amounts("positionId"), "inner").drop(whp("eventTime")).drop(amounts("eventTime")).drop(whp("positionId")).drop(amounts("positionId")).show

amount warehouse product
10.00 W-1 P-1
10.20 W-1 P-1
5.00 W-1 P-2
4.90 W-2 P-3
5.50 W-2 P-3
5.00 W-2 P-3
99.99 W-2 P-4
99.57 W-2 P-4

What i tested so far is to get the max amount, but i don't know how to proceed further to get the min and the avg.

val columnMax = all.groupBy("warehouse", "product").max("amount").show

warehouse product max(amount)
W-2 P-3 5.50
W-1 P-2 5.00
W-2 P-4 99.99
W-1 P-1 10.20

CodePudding user response:

Your implementation is correct , the only thing you need is to use agg, when multiple columns are involved which allows you to pass multiple functions over columns

Data Preparation

from io import StringIO
import pandas as pd

s1 = StringIO("""
positionId  warehouse   product eventTime
1   W-1 P-1 1528463098
2   W-1 P-2 1528463100
3   W-2 P-3 1528463110
4   W-2 P-4 1528463111
""")

s2 = StringIO("""
positionId  amount  eventTime
1   10.00   1528463098
1   10.20   1528463008
2   5.00    1528463100
3   4.90    1528463100
3   5.50    1528463111
3   5.00    1528463105
4   99.99   1528463111
4   99.57   1528463112
""")


df1 = pd.read_csv(s1,delimiter='\t')
df2 = pd.read_csv(s2,delimiter='\t')

warehouseDF = sql.createDataFrame(df1)
amountsDF = sql.createDataFrame(df2)

Aggregation

aggDF = warehouseDF.join(amountsDF
                        ,warehouseDF['positionId'] == amountsDF['positionId']
                    ).select(warehouseDF['warehouse']
                             ,warehouseDF['product']
                             ,amountsDF['amount']
                    ).groupBy(['warehouse','product'])\
                     .agg(*[
                                F.min(F.col('amount')).alias('min_amount')
                               ,F.max(F.col('amount')).alias('max_amount')
                               ,F.avg(F.col('amount')).alias('avg_amount')
                           ])

aggDF.show()

 --------- ------- ---------- ---------- ----------------- 
|warehouse|product|min_amount|max_amount|       avg_amount|
 --------- ------- ---------- ---------- ----------------- 
|      W-2|    P-3|       4.9|       5.5|5.133333333333334|
|      W-1|    P-2|       5.0|       5.0|              5.0|
|      W-2|    P-4|     99.57|     99.99|            99.78|
|      W-1|    P-1|      10.0|      10.2|             10.1|
 --------- ------- ---------- ---------- ----------------- 
  • Related