Home > front end >  Statistics from nested array using PySpark
Statistics from nested array using PySpark

Time:07-05

I want to perform some aggregations on the table using PySpark. My table name is COMPLEX_DATA and is stored in Snowflake. The COORDINATES column's datatype is VARIANT. The nested arrays are not of fixed sizes. I could have 1,000 nested arrays, and there are other cases where there are none. My table is shown below:

| ID |         COORDINATES                  |
|----|--------------------------------------|
| 1  |[[0, 15], [1, 50], [2, 5]], [5, 650]] |
| 2  |[[0, 20], [1, 17]]                    |
| 3  |[[0, 10], [1, 15], [2, 11]]           |

The output should look like the following:

| Index| Min | Max | Mean |
|------|-----|-----|------|
| 0    |  15 |  20 | 15   |
| 1    |  15 |  50 | 27.33| 
| 2    |  5  |  11 | 8    |
| 5    |  650|  650| 650  |

CodePudding user response:

Lets assume that when you import this table in spark corrdinates column will be of Array type so based on it below code can be used where I have used explode and then group by over Index

from pyspark.sql import functions as F
deta=[(1,[[0, 15], [1, 50], [2, 5], [5, 650]]),(2,[[0, 20], [1, 17]]),(3,[[0, 10], [1, 15], [2, 11]])]
dql1=spark.createDataFrame(deta).toDF("ID","COORDINATES")
dql1.withColumn("ecor",F.explode("coordinates")).withColumn("Index",F.col("ecor")[0]).withColumn("ecor_val",F.col("ecor")[1]).drop("id","coordinates","ecor").groupBy("Index").agg(F.min("ecor_val").alias("Min"),F.max("ecor_val").alias("Max"),F.mean("ecor_val").alias("Mean")).show()

#output
 ----- --- --- ------------------ 
|Index|Min|Max|              Mean|
 ----- --- --- ------------------ 
|    0| 10| 20|              15.0|
|    1| 15| 50|27.333333333333332|
|    5|650|650|             650.0|
|    2|  5| 11|               8.0|
 ----- --- --- ------------------ 

Also from datqa you have provided min of 0 should be 10 not 15 as for ID =3 we have [0, 10] in coordinates

CodePudding user response:

You can achieve this with a bit of pre-processing using array functions & Higher Order Functions

Data Preparation

input_list = [
    (1,[[0, 15], [1, 50], [2, 5], [5, 650]])
    ,(2,[[0, 20], [1, 17]] )
    ,(3,[[0, 10], [1, 15], [2, 11]])
]

sparkDF = sql.createDataFrame(input_list,['ID','COORDINATES'])


sparkDF.show(truncate=False)

 --- ------------------------------------ 
|ID |COORDINATES                         |
 --- ------------------------------------ 
|1  |[[0, 15], [1, 50], [2, 5], [5, 650]]|
|2  |[[0, 20], [1, 17]]                  |
|3  |[[0, 10], [1, 15], [2, 11]]         |
 --- ------------------------------------ 

Extract & Explode Index

sparkDF = sparkDF.withColumn('INDEX',F.transform(F.col('COORDINATES'),lambda x:x.getItem(0)))\
                 .withColumn('VALUES',F.transform(F.col('COORDINATES'),lambda x:x.getItem(1)))\
                 .withColumn('INDEX',F.explode(F.col('INDEX')))\

sparkDF.show(truncate=False)          

 --- ------------------------------------ ----- ---------------- 
|ID |COORDINATES                         |INDEX|VALUES          |
 --- ------------------------------------ ----- ---------------- 
|1  |[[0, 15], [1, 50], [2, 5], [5, 650]]|0    |[15, 50, 5, 650]|
|1  |[[0, 15], [1, 50], [2, 5], [5, 650]]|1    |[15, 50, 5, 650]|
|1  |[[0, 15], [1, 50], [2, 5], [5, 650]]|2    |[15, 50, 5, 650]|
|1  |[[0, 15], [1, 50], [2, 5], [5, 650]]|5    |[15, 50, 5, 650]|
|2  |[[0, 20], [1, 17]]                  |0    |[20, 17]        |
|2  |[[0, 20], [1, 17]]                  |1    |[20, 17]        |
|3  |[[0, 10], [1, 15], [2, 11]]         |0    |[10, 15, 11]    |
|3  |[[0, 10], [1, 15], [2, 11]]         |1    |[10, 15, 11]    |
|3  |[[0, 10], [1, 15], [2, 11]]         |2    |[10, 15, 11]    |
 --- ------------------------------------ ----- ---------------- 

Extract Corresponding Values

Since multiple values are spread across INDEX , you need to extract only the value corresponding to its array-index , using row_number

window = Window.partitionBy('ID').orderBy(F.col('ID'))
    
sparkDF = sparkDF.withColumn('row_num',F.row_number().over(window) - 1)\
                 .withColumn('VALUES',F.col('VALUES').getItem(F.col('row_num')))

sparkDF.show(truncate=False)          

 --- ------------------------------------ ----- ------ ------- 
|ID |COORDINATES                         |INDEX|VALUES|row_num|
 --- ------------------------------------ ----- ------ ------- 
|1  |[[0, 15], [1, 50], [2, 5], [5, 650]]|0    |15    |0      |
|1  |[[0, 15], [1, 50], [2, 5], [5, 650]]|1    |50    |1      |
|1  |[[0, 15], [1, 50], [2, 5], [5, 650]]|2    |5     |2      |
|1  |[[0, 15], [1, 50], [2, 5], [5, 650]]|5    |650   |3      |
|3  |[[0, 10], [1, 15], [2, 11]]         |0    |10    |0      |
|3  |[[0, 10], [1, 15], [2, 11]]         |1    |15    |1      |
|3  |[[0, 10], [1, 15], [2, 11]]         |2    |11    |2      |
|2  |[[0, 20], [1, 17]]                  |0    |20    |0      |
|2  |[[0, 20], [1, 17]]                  |1    |17    |1      |
 --- ------------------------------------ ----- ------ ------- 

Final Aggreagtions

 ----- --- --- ------------------ 
|INDEX|Min|Max|Mean              |
 ----- --- --- ------------------ 
|0    |10 |20 |15.0              |
|1    |15 |50 |27.333333333333332|
|2    |5  |11 |8.0               |
|5    |650|650|650.0             |
 ----- --- --- ------------------ 

CodePudding user response:

from pyspark.sql import functions as F

df = df.select(F.explode('COORDINATES'))
df = df.groupBy(F.col('col')[0].alias('Index')) \
       .agg(F.min(F.col('col')[1]).alias('Min'),
            F.max(F.col('col')[1]).alias('Max'),
            F.avg(F.col('col')[1]).alias('Mean'))
  • Related