I'm, newbie with Spark have this exercise to solve with the following tables and i'm using spark 3.3.0
list of warehouse positions (positionId, warehouse, product, eventTime)
positionId | warehouse | product | eventTime |
---|---|---|---|
1 | W-1 | P-1 | 1528463098 |
2 | W-1 | P-2 | 1528463100 |
3 | W-2 | P-3 | 1528463110 |
4 | W-2 | P-4 | 1528463111 |
list of amounts (positionId, amount, eventTime)
positionId | amount | eventTime |
---|---|---|
1 | 10.00 | 1528463098 |
1 | 10.20 | 1528463008 |
2 | 5.00 | 1528463100 |
3 | 4.90 | 1528463100 |
3 | 5.50 | 1528463111 |
3 | 5.00 | 1528463105 |
4 | 99.99 | 1528463111 |
4 | 99.57 | 1528463112 |
i need to find max, min, avg amounts for each warehouse and product and place the result in three new columns.
W-1, P-1, <max?(ex10.20)>, <min?(ex10.00)>, <avg?>
…..
W-2, P-4, <max?>, <min?>, <avg?>
so far i joined the two tables on positionId and dropped the columns that i think i won't need, such a positionId and eventTime for both.
val all = amounts.join(whp, whp("positionId") === amounts("positionId"), "inner").drop(whp("eventTime")).drop(amounts("eventTime")).drop(whp("positionId")).drop(amounts("positionId")).show
amount | warehouse | product |
---|---|---|
10.00 | W-1 | P-1 |
10.20 | W-1 | P-1 |
5.00 | W-1 | P-2 |
4.90 | W-2 | P-3 |
5.50 | W-2 | P-3 |
5.00 | W-2 | P-3 |
99.99 | W-2 | P-4 |
99.57 | W-2 | P-4 |
What i tested so far is to get the max amount, but i don't know how to proceed further to get the min and the avg.
val columnMax = all.groupBy("warehouse", "product").max("amount").show
warehouse | product | max(amount) |
---|---|---|
W-2 | P-3 | 5.50 |
W-1 | P-2 | 5.00 |
W-2 | P-4 | 99.99 |
W-1 | P-1 | 10.20 |
CodePudding user response:
Your implementation is correct , the only thing you need is to use agg, when multiple columns are involved which allows you to pass multiple functions over columns
Data Preparation
from io import StringIO
import pandas as pd
s1 = StringIO("""
positionId warehouse product eventTime
1 W-1 P-1 1528463098
2 W-1 P-2 1528463100
3 W-2 P-3 1528463110
4 W-2 P-4 1528463111
""")
s2 = StringIO("""
positionId amount eventTime
1 10.00 1528463098
1 10.20 1528463008
2 5.00 1528463100
3 4.90 1528463100
3 5.50 1528463111
3 5.00 1528463105
4 99.99 1528463111
4 99.57 1528463112
""")
df1 = pd.read_csv(s1,delimiter='\t')
df2 = pd.read_csv(s2,delimiter='\t')
warehouseDF = sql.createDataFrame(df1)
amountsDF = sql.createDataFrame(df2)
Aggregation
aggDF = warehouseDF.join(amountsDF
,warehouseDF['positionId'] == amountsDF['positionId']
).select(warehouseDF['warehouse']
,warehouseDF['product']
,amountsDF['amount']
).groupBy(['warehouse','product'])\
.agg(*[
F.min(F.col('amount')).alias('min_amount')
,F.max(F.col('amount')).alias('max_amount')
,F.avg(F.col('amount')).alias('avg_amount')
])
aggDF.show()
--------- ------- ---------- ---------- -----------------
|warehouse|product|min_amount|max_amount| avg_amount|
--------- ------- ---------- ---------- -----------------
| W-2| P-3| 4.9| 5.5|5.133333333333334|
| W-1| P-2| 5.0| 5.0| 5.0|
| W-2| P-4| 99.57| 99.99| 99.78|
| W-1| P-1| 10.0| 10.2| 10.1|
--------- ------- ---------- ---------- -----------------