Home > database >  Pyspark - Aggregation on nested items with multiple conditions
Pyspark - Aggregation on nested items with multiple conditions

Time:03-23

I'm using Pyspark and I have a dataframe with the following schema

 root
     |-- BOOK_ID: integer (nullable = false)
     |-- Chapters: array (nullable = true) 
     |    |-- element: struct (containsNull = true)
     |    |    |-- NAME: string (nullable = true)
     |    |    |-- NUMBER_PAGES: integer (nullable = true)

How can we add a new column named short_chapters that calculate for each book the Sum of NUMBER_PAGES for chapters with NAME.length < 10 ?

Note: we have a list of chapters, is there a way to iterate on without flatten the dataframe ?

CodePudding user response:

You need to define a user defined functioned (short udf) that filters the chapter. The return type of the udf is the same as the Chapters column which we take using the schema.

from pyspark.sql.functions import udf

def filter_short_chapters(chapters, thresh):
    return list(filter(lambda chapter: chapter.NUMBER_PAGES < thresh, chapters))

udf_fn = udf(filter_short_chapter, df.schema['Chapters'].dataType)
df = df.withColumn('short_chapters', udf_fn(df.Chapters, 10))

CodePudding user response:

You can compute short_chapters using higher order functions. Find all chapters whose name has length < 10 using filter. Then add the NUMBER_PAGES for the chapters identified using aggregate.

from pyspark.sql import functions as F
from pyspark.sql import Row

df = spark.createDataFrame([("1", [Row(NAME="xs", NUMBER_PAGES=1),
                                   Row(NAME="s", NUMBER_PAGES=5),
                                   Row(NAME="Really Long Name", NUMBER_PAGES=100),
                                   Row(NAME="Really Long Name", NUMBER_PAGES=150), ],), ],
                           'struct<BOOK_ID:string,Chapters:array<struct<NAME:string,NUMBER_PAGES:int>>>')

df.printSchema()

"""
root
 |-- BOOK_ID: string (nullable = true)
 |-- Chapters: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- NAME: string (nullable = true)
 |    |    |-- NUMBER_PAGES: integer (nullable = true)
"""

# Filter for short chapters
short_chapters = F.filter("Chapters", lambda x: F.length(x["NAME"]) < 10)

# Sum number of pages for short chapters
pages_in_short_chapter = F.aggregate(short_chapters, F.lit(0), lambda acc, x: acc   x["NUMBER_PAGES"])

df.withColumn("short_chapters", pages_in_short_chapter).show(truncate=False)

"""
 ------- ------------------------------------------------------------------- -------------- 
|BOOK_ID|Chapters                                                           |short_chapters|
 ------- ------------------------------------------------------------------- -------------- 
|1      |[{xs, 1}, {s, 5}, {Really Long Name, 100}, {Really Long Name, 150}]|6             |
 ------- ------------------------------------------------------------------- -------------- 
"""
  • Related