I'm using Pyspark
and I have a dataframe with the following schema
root
|-- BOOK_ID: integer (nullable = false)
|-- Chapters: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- NAME: string (nullable = true)
| | |-- NUMBER_PAGES: integer (nullable = true)
How can we add a new column named short_chapters
that calculate for each book the Sum of NUMBER_PAGES for chapters with NAME.length < 10 ?
Note: we have a list of chapters, is there a way to iterate on without flatten the dataframe ?
CodePudding user response:
You need to define a user defined functioned (short udf) that filters the chapter. The return type of the udf is the same as the Chapters column which we take using the schema.
from pyspark.sql.functions import udf
def filter_short_chapters(chapters, thresh):
return list(filter(lambda chapter: chapter.NUMBER_PAGES < thresh, chapters))
udf_fn = udf(filter_short_chapter, df.schema['Chapters'].dataType)
df = df.withColumn('short_chapters', udf_fn(df.Chapters, 10))
CodePudding user response:
You can compute short_chapters
using higher order functions
. Find all chapters whose name has length < 10 using filter
. Then add the NUMBER_PAGES
for the chapters identified using aggregate
.
from pyspark.sql import functions as F
from pyspark.sql import Row
df = spark.createDataFrame([("1", [Row(NAME="xs", NUMBER_PAGES=1),
Row(NAME="s", NUMBER_PAGES=5),
Row(NAME="Really Long Name", NUMBER_PAGES=100),
Row(NAME="Really Long Name", NUMBER_PAGES=150), ],), ],
'struct<BOOK_ID:string,Chapters:array<struct<NAME:string,NUMBER_PAGES:int>>>')
df.printSchema()
"""
root
|-- BOOK_ID: string (nullable = true)
|-- Chapters: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- NAME: string (nullable = true)
| | |-- NUMBER_PAGES: integer (nullable = true)
"""
# Filter for short chapters
short_chapters = F.filter("Chapters", lambda x: F.length(x["NAME"]) < 10)
# Sum number of pages for short chapters
pages_in_short_chapter = F.aggregate(short_chapters, F.lit(0), lambda acc, x: acc x["NUMBER_PAGES"])
df.withColumn("short_chapters", pages_in_short_chapter).show(truncate=False)
"""
------- ------------------------------------------------------------------- --------------
|BOOK_ID|Chapters |short_chapters|
------- ------------------------------------------------------------------- --------------
|1 |[{xs, 1}, {s, 5}, {Really Long Name, 100}, {Really Long Name, 150}]|6 |
------- ------------------------------------------------------------------- --------------
"""