Home > Net >  Pyspark groupby for all column with unpivot
Pyspark groupby for all column with unpivot

Time:06-24

I have 101 columns from a pipe delimited and looking to get counts for all columns with all untransposing the data.

Sample data:

 ---------------- ------------ ------------ ------------ ------------ ------------ ------------ 
|rm_ky|flag_010961|flag_011622|flag_009670|flag_009708|flag_009890|flag_009893|
 ---------------- ------------ ------------ ------------ ------------ ------------ ------------ 
|    193012020044|           0|           0|           0|           0|           0|           0|
|    115012030044|           0|           0|           1|           1|           1|           1|
|    140012220044|           0|           0|           0|           0|           0|           0|
|    189012240044|           0|           0|           0|           0|           0|           0|
|    151012350044|           0|           0|           0|           0|           0|           0|
 ---------------- ------------ ------------ ------------ ------------ ------------ ------------ 

I have tried each column based out like

df.groupBy("flag_011622").count().show()

 ------------ -------- 
|flag_011622|   count|
 ------------ -------- 
|           1|  192289|
|           0|69861980|
 ------------ -------- 

Instead I'm looking something like I'm looking something like: Any suggestions to handle instead of loop in each time

 ---------------- ------------ ------------         
|rm_ky|flag_010961|flag_name|counts|
 ---------------- ------------ ------------ --------
|    flag_011622|           1|           192289| 
|    flag_011622|           0|           69861980|   
|    flag_009670|           1|           120011800| 
|    flag_009670|           0|           240507|    
|    flag_009708|           1|           119049838| 
|    flag_009708|           0|           1202469|   
 ---------------- ------------ ------------ --------

CodePudding user response:

You could use stack function that returns a reshaped DataFrame or Series having a multi-level index with one or more new inner-most levels compared to the current DataFrame. The new inner-most levels are created by pivoting the columns of the current dataframe.

Using your sample as df:

df = df.select(
    "rm_ky",
    F.expr(
        """stack(5, 
                'flag_010961', flag_010961, 
                'flag_009670', flag_009670, 
                'flag_009708', flag_009708, 
                'flag_009890', flag_009890, 
                'flag_009893', flag_009893
                ) AS (flag_name, value)"""
    ),
)

gives:

 ------------ ----------- -----                                                 
|rm_ky       |flag_name  |value|
 ------------ ----------- ----- 
|193012020044|flag_010961|0    |
|193012020044|flag_009670|0    |
|193012020044|flag_009708|0    |
|193012020044|flag_009890|0    |
|193012020044|flag_009893|0    |
|115012030044|flag_010961|0    |
|115012030044|flag_009670|0    |
|115012030044|flag_009708|1    |
|115012030044|flag_009890|1    |
|115012030044|flag_009893|1    |
|140012220044|flag_010961|0    |
|140012220044|flag_009670|0    |
|140012220044|flag_009708|0    |
|140012220044|flag_009890|0    |
|140012220044|flag_009893|0    |
|189012240044|flag_010961|0    |
|189012240044|flag_009670|0    |
|189012240044|flag_009708|0    |
|189012240044|flag_009890|0    |
|189012240044|flag_009893|0    |
|151012350044|flag_010961|0    |
|151012350044|flag_009670|0    |
|151012350044|flag_009708|0    |
|151012350044|flag_009890|0    |
|151012350044|flag_009893|0    |
 ------------ ----------- ----- 

Which you can then group and order:

df = (
    df.groupBy("flag_name", "value")
    .agg(F.count("*").alias("counts"))
    .orderBy("flag_name", "value")
)

to get:

 ----------- ----- ------ 
|flag_name  |value|counts|
 ----------- ----- ------ 
|flag_009670|0    |5     |
|flag_009708|0    |4     |
|flag_009708|1    |1     |
|flag_009890|0    |4     |
|flag_009890|1    |1     |
|flag_009893|0    |4     |
|flag_009893|1    |1     |
|flag_010961|0    |5     |
 ----------- ----- ------ 

CodePudding user response:

Exemple:

data = [ ("193012020044",0, 0, 0, 0, 0, 1)
        ,("115012030044",0, 0, 1, 1, 1, 1)
        ,("140012220044",0, 0, 0, 0, 0, 0)
        ,("189012240044",0, 1, 0, 0, 0, 0)
        ,("151012350044",0, 0, 0, 1, 1, 0)]
columns= ["rm_ky","flag_010961","flag_011622","flag_009670","flag_009708","flag_009890","flag_009893"]
df = spark.createDataFrame(data = data, schema = columns)

df.show()

 ------------ ----------- ----------- ----------- ----------- ----------- ----------- 
|       rm_ky|flag_010961|flag_011622|flag_009670|flag_009708|flag_009890|flag_009893|
 ------------ ----------- ----------- ----------- ----------- ----------- ----------- 
|193012020044|          0|          0|          0|          0|          0|          1|
|115012030044|          0|          0|          1|          1|          1|          1|
|140012220044|          0|          0|          0|          0|          0|          0|
|189012240044|          0|          1|          0|          0|          0|          0|
|151012350044|          0|          0|          0|          1|          1|          0|
 ------------ ----------- ----------- ----------- ----------- ----------- ----------- 

Creating an expression to unpivot:

x = ""
cnt = 0
for col in df.columns:
  if col != 'rm_ky':
    cnt  = 1
    x  = "'" str(col) "', "   str(col)   ", "
  
x = x[:-2]
xpr = """stack({}, {}) as (Type,Value)""".format(cnt,x)
print(xpr)

>> stack(6, 'flag_010961', flag_010961, 'flag_011622', flag_011622, 'flag_009670', flag_009670, 'flag_009708', flag_009708, 'flag_009890', flag_009890, 'flag_009893', flag_009893) as (Type,Value)

Then, using expr and pivot:

from pyspark.sql import functions as F

df\
  .drop('rm_ky')\
  .select(F.lit('dummy'),F.expr(xpr))\
  .drop('dummy')\
  .groupBy('Type')\
    .pivot('Value')\
    .agg(*[F.count(x).alias(x) for x in df_output.columns if x not in {"Type"}])\
    .fillna(0)\
    .show()

 ----------- --- --- 
|       Type|  0|  1|
 ----------- --- --- 
|flag_009890|  3|  2|
|flag_009893|  3|  2|
|flag_011622|  4|  1|
|flag_010961|  5|  0|
|flag_009708|  3|  2|
|flag_009670|  4|  1|
 ----------- --- --- 

CodePudding user response:

i think this is what you are looking for

>>> df2.show()
 ------------ ----------- ----------- ----------- ----------- ----------- ----------- 
|       rm_ky|flag_010961|flag_011622|flag_009670|flag_009708|flag_009890|flag_009893|
 ------------ ----------- ----------- ----------- ----------- ----------- ----------- 
|193012020044|          0|          0|          0|          0|          0|          0|
|115012030044|          0|          0|          1|          1|          1|          1|
|140012220044|          0|          0|          0|          0|          0|          0|
|189012240044|          0|          0|          0|          0|          0|          0|
|151012350044|          0|          0|          0|          0|          0|          0|
 ------------ ----------- ----------- ----------- ----------- ----------- ----------- 
>>> unpivotExpr = "stack(6, 'flag_010961',flag_010961,'flag_011622',flag_011622,'flag_009670',flag_009670, 'flag_009708',flag_009708, 'flag_009890',flag_009890, 'flag_009893',flag_009893) as (flag,flag_val)"
>>> unPivotDF = df2.select("rm_ky", expr(unpivotExpr))
>>> unPivotDF.show()
 ------------ ----------- -------- 
|       rm_ky|       flag|flag_val|
 ------------ ----------- -------- 
|193012020044|flag_010961|       0|
|193012020044|flag_011622|       0|
|193012020044|flag_009670|       0|
|193012020044|flag_009708|       0|
|193012020044|flag_009890|       0|
|193012020044|flag_009893|       0|
|115012030044|flag_010961|       0|
|115012030044|flag_011622|       0|
|115012030044|flag_009670|       1|
|115012030044|flag_009708|       1|
|115012030044|flag_009890|       1|
|115012030044|flag_009893|       1|
|140012220044|flag_010961|       0|
|140012220044|flag_011622|       0|
|140012220044|flag_009670|       0|
|140012220044|flag_009708|       0|
|140012220044|flag_009890|       0|
|140012220044|flag_009893|       0|
|189012240044|flag_010961|       0|
|189012240044|flag_011622|       0|
 ------------ ----------- -------- 
only showing top 20 rows

>>> unPivotDF.groupBy("flag","flag_val").count().show()
 ----------- -------- ----- 
|       flag|flag_val|count|
 ----------- -------- ----- 
|flag_009670|       0|    4|
|flag_009708|       0|    4|
|flag_009893|       0|    4|
|flag_009890|       0|    4|
|flag_009670|       1|    1|
|flag_009893|       1|    1|
|flag_011622|       0|    5|
|flag_010961|       0|    5|
|flag_009890|       1|    1|
|flag_009708|       1|    1|
 ----------- -------- ----- 

>>> unPivotDF.groupBy("rm_ky","flag","flag_val").count().show()
 ------------ ----------- -------- ----- 
|       rm_ky|       flag|flag_val|count|
 ------------ ----------- -------- ----- 
|151012350044|flag_009708|       0|    1|
|115012030044|flag_010961|       0|    1|
|140012220044|flag_009670|       0|    1|
|189012240044|flag_010961|       0|    1|
|151012350044|flag_009670|       0|    1|
|115012030044|flag_009890|       1|    1|
|151012350044|flag_009890|       0|    1|
|189012240044|flag_009890|       0|    1|
|193012020044|flag_011622|       0|    1|
|193012020044|flag_009670|       0|    1|
|115012030044|flag_009670|       1|    1|
|140012220044|flag_011622|       0|    1|
|151012350044|flag_009893|       0|    1|
|140012220044|flag_009893|       0|    1|
|189012240044|flag_011622|       0|    1|
|189012240044|flag_009893|       0|    1|
|115012030044|flag_009893|       1|    1|
|140012220044|flag_009708|       0|    1|
|189012240044|flag_009708|       0|    1|
|193012020044|flag_010961|       0|    1|
 ------------ ----------- -------- ----- 
  • Related