I want to perform some aggregations on the table using PySpark. My table name is COMPLEX_DATA and is stored in Snowflake. The COORDINATES column's datatype is VARIANT. The nested arrays are not of fixed sizes. I could have 1,000 nested arrays, and there are other cases where there are none. My table is shown below:
| ID | COORDINATES |
|----|--------------------------------------|
| 1 |[[0, 15], [1, 50], [2, 5]], [5, 650]] |
| 2 |[[0, 20], [1, 17]] |
| 3 |[[0, 10], [1, 15], [2, 11]] |
The output should look like the following:
| Index| Min | Max | Mean |
|------|-----|-----|------|
| 0 | 15 | 20 | 15 |
| 1 | 15 | 50 | 27.33|
| 2 | 5 | 11 | 8 |
| 5 | 650| 650| 650 |
CodePudding user response:
Lets assume that when you import this table in spark corrdinates column will be of Array type so based on it below code can be used where I have used explode and then group by over Index
from pyspark.sql import functions as F
deta=[(1,[[0, 15], [1, 50], [2, 5], [5, 650]]),(2,[[0, 20], [1, 17]]),(3,[[0, 10], [1, 15], [2, 11]])]
dql1=spark.createDataFrame(deta).toDF("ID","COORDINATES")
dql1.withColumn("ecor",F.explode("coordinates")).withColumn("Index",F.col("ecor")[0]).withColumn("ecor_val",F.col("ecor")[1]).drop("id","coordinates","ecor").groupBy("Index").agg(F.min("ecor_val").alias("Min"),F.max("ecor_val").alias("Max"),F.mean("ecor_val").alias("Mean")).show()
#output
----- --- --- ------------------
|Index|Min|Max| Mean|
----- --- --- ------------------
| 0| 10| 20| 15.0|
| 1| 15| 50|27.333333333333332|
| 5|650|650| 650.0|
| 2| 5| 11| 8.0|
----- --- --- ------------------
Also from datqa you have provided min of 0 should be 10 not 15 as for ID =3 we have [0, 10] in coordinates
CodePudding user response:
You can achieve this with a bit of pre-processing using array functions & Higher Order Functions
Data Preparation
input_list = [
(1,[[0, 15], [1, 50], [2, 5], [5, 650]])
,(2,[[0, 20], [1, 17]] )
,(3,[[0, 10], [1, 15], [2, 11]])
]
sparkDF = sql.createDataFrame(input_list,['ID','COORDINATES'])
sparkDF.show(truncate=False)
--- ------------------------------------
|ID |COORDINATES |
--- ------------------------------------
|1 |[[0, 15], [1, 50], [2, 5], [5, 650]]|
|2 |[[0, 20], [1, 17]] |
|3 |[[0, 10], [1, 15], [2, 11]] |
--- ------------------------------------
Extract & Explode Index
sparkDF = sparkDF.withColumn('INDEX',F.transform(F.col('COORDINATES'),lambda x:x.getItem(0)))\
.withColumn('VALUES',F.transform(F.col('COORDINATES'),lambda x:x.getItem(1)))\
.withColumn('INDEX',F.explode(F.col('INDEX')))\
sparkDF.show(truncate=False)
--- ------------------------------------ ----- ----------------
|ID |COORDINATES |INDEX|VALUES |
--- ------------------------------------ ----- ----------------
|1 |[[0, 15], [1, 50], [2, 5], [5, 650]]|0 |[15, 50, 5, 650]|
|1 |[[0, 15], [1, 50], [2, 5], [5, 650]]|1 |[15, 50, 5, 650]|
|1 |[[0, 15], [1, 50], [2, 5], [5, 650]]|2 |[15, 50, 5, 650]|
|1 |[[0, 15], [1, 50], [2, 5], [5, 650]]|5 |[15, 50, 5, 650]|
|2 |[[0, 20], [1, 17]] |0 |[20, 17] |
|2 |[[0, 20], [1, 17]] |1 |[20, 17] |
|3 |[[0, 10], [1, 15], [2, 11]] |0 |[10, 15, 11] |
|3 |[[0, 10], [1, 15], [2, 11]] |1 |[10, 15, 11] |
|3 |[[0, 10], [1, 15], [2, 11]] |2 |[10, 15, 11] |
--- ------------------------------------ ----- ----------------
Extract Corresponding Values
Since multiple values are spread across INDEX
, you need to extract only the value corresponding to its array-index
, using row_number
window = Window.partitionBy('ID').orderBy(F.col('ID'))
sparkDF = sparkDF.withColumn('row_num',F.row_number().over(window) - 1)\
.withColumn('VALUES',F.col('VALUES').getItem(F.col('row_num')))
sparkDF.show(truncate=False)
--- ------------------------------------ ----- ------ -------
|ID |COORDINATES |INDEX|VALUES|row_num|
--- ------------------------------------ ----- ------ -------
|1 |[[0, 15], [1, 50], [2, 5], [5, 650]]|0 |15 |0 |
|1 |[[0, 15], [1, 50], [2, 5], [5, 650]]|1 |50 |1 |
|1 |[[0, 15], [1, 50], [2, 5], [5, 650]]|2 |5 |2 |
|1 |[[0, 15], [1, 50], [2, 5], [5, 650]]|5 |650 |3 |
|3 |[[0, 10], [1, 15], [2, 11]] |0 |10 |0 |
|3 |[[0, 10], [1, 15], [2, 11]] |1 |15 |1 |
|3 |[[0, 10], [1, 15], [2, 11]] |2 |11 |2 |
|2 |[[0, 20], [1, 17]] |0 |20 |0 |
|2 |[[0, 20], [1, 17]] |1 |17 |1 |
--- ------------------------------------ ----- ------ -------
Final Aggreagtions
----- --- --- ------------------
|INDEX|Min|Max|Mean |
----- --- --- ------------------
|0 |10 |20 |15.0 |
|1 |15 |50 |27.333333333333332|
|2 |5 |11 |8.0 |
|5 |650|650|650.0 |
----- --- --- ------------------
CodePudding user response:
from pyspark.sql import functions as F
df = df.select(F.explode('COORDINATES'))
df = df.groupBy(F.col('col')[0].alias('Index')) \
.agg(F.min(F.col('col')[1]).alias('Min'),
F.max(F.col('col')[1]).alias('Max'),
F.avg(F.col('col')[1]).alias('Mean'))