Home > Software engineering >  Group by and aggregate on a column with array in PySpark
Group by and aggregate on a column with array in PySpark

Time:04-20

I have the below PySpark dataframe. column_2 is of complex data type array<map<string,bigint>>

Column_1 Column_2                 Column_3
A        [{Mat=7},{Phy=8}]        ABC
A        [{Mat=7},{Phy=8}]        CDE
B        [{Mat=6},{Phy=7}]        ZZZ

I have to group by on column 1 and column 2 and get the minimum aggregate of column 3.

The problem is when I try to group by column 1 and column 2 it's giving me an error

cannot be used as grouping expression because the data type is not an orderable data type

Is there a way to include this column in group by or to aggregate it in some way. The values in column_2 will always be same for a key value in column_1

Expected output:

Column_1 Column_2                Column_3
A        [{Mat=7},{Phy=8}]       ABC
B        [{Mat=6},{Phy=7}]       ZZZ

Is it possible to do a collect list of all value in aggregate function and flatten it and remove duplicates?

CodePudding user response:

The values in column_2 will always be same for a key value in column_1

If so, then you can just take the first value in the group.

Test dataframe:

from pyspark.sql import functions as F

df = spark.createDataFrame(
    [('A', 'ABC', 7, 8),
     ('A', 'CDE', 7, 8),
     ('B', 'ZZZ', 6, 7)],
    ['Column_1', 'Column_3', 'm', 'p'])
df = df.select(
    'Column_1',
    F.array(F.create_map(F.lit('Mat'), 'm'), F.create_map(F.lit('Phy'), 'p')).alias('Column_2'),
    'Column_3'
)
df.show(truncate=False)
print(df.dtypes)
#  -------- ------------------------ -------- 
# |Column_1|Column_2                |Column_3|
#  -------- ------------------------ -------- 
# |A       |[{Mat -> 7}, {Phy -> 8}]|ABC     |
# |A       |[{Mat -> 7}, {Phy -> 8}]|CDE     |
# |B       |[{Mat -> 6}, {Phy -> 7}]|ZZZ     |
#  -------- ------------------------ -------- 

# [('Column_1', 'string'), ('Column_2', 'array<map<string,bigint>>'), ('Column_3', 'string')]

Aggregation:

df2 = df.groupBy('Column_1').agg(
    F.first('Column_2').alias('Column_2'),
    F.min('Column_3').alias('Column_3')
)
df2.show(truncate=False)
#  -------- ------------------------ -------- 
# |Column_1|Column_2                |Column_3|
#  -------- ------------------------ -------- 
# |A       |[{Mat -> 7}, {Phy -> 8}]|ABC     |
# |B       |[{Mat -> 6}, {Phy -> 7}]|ZZZ     |
#  -------- ------------------------ -------- 
  • Related