Home > Software engineering >  Spark SQL (Scala) - How to get the maximum/minimum value of an array of objects
Spark SQL (Scala) - How to get the maximum/minimum value of an array of objects

Time:08-26

Let's say I have Authors who write Books (column of Seq[Book]), and each Book has Chapters (column of Seq[Chapter]. I want to have a table of Authors, each row representing an author, with a column indicating the lowest chapter page count they've written in their oldest book. If the data were to look like, just as an example:

case class Chapter(chapterTitle: String, pages: Int)
case class Book(title: String, releaseTimestamp: BigInt, chapters: Seq[Chapter])
case class Author(id: Int, name: String, nationality: String, books: Seq[Book])

val chapter1 = Chapter(chapterTitle="A", pages=23)
val chapter2 = Chapter(chapterTitle="B", pages=31)
val chapter3 = Chapter(chapterTitle="C", pages=51)
val chapter4 = Chapter(chapterTitle="D", pages=178)
val chapter5 = Chapter(chapterTitle="E", pages=12)
val chapter6 = Chapter(chapterTitle="F", pages=23)
val chapter7 = Chapter(chapterTitle="G", pages=4)
val chapter8 = Chapter(chapterTitle="H", pages=46)
val chapter9 = Chapter(chapterTitle="I", pages=30)

val book1 = Book(title="Harry Potter", releaseTimestamp=1023131, chapters=Seq(chapter1, chapter2))
val book2 = Book(title="Fantastic Beasts", releaseTimestamp=1514322, chapters=Seq(chapter3))
val book3 = Book(title="Mistborn", releaseTimestamp=172322, chapters=Seq(chapter4, chapter5))
val book4 = Book(title="The Way of Kings", releaseTimestamp=651231, chapters=Seq(chapter6, chapter7))
val book5 = Book(title="A Game of Thrones", releaseTimestamp=812312, chapters=Seq(chapter8, chapter9))

val author1 = Author(id=1, name="J K Rowling", nationality="UK", books=Seq(book1, book2))
val author2 = Author(id=2, name="Brandon Sanderson", nationality="US", books=Seq(book3, book4))
val author3 = Author(id=3, name="George R R Martin", nationality="US", books=Seq(book5))

val table = Seq(author1, author2, author3)
val authorsDF = table.toDF()

authorsDF would then look like

id name nationality books
1 J K Rowling UK books array here...
2 Brandon Sanderson US books array here...
3 George R R Martin US books array here...

Desired outcome: I want to be able to do queries on the table authorsDF so it shows the minimum page something like:

id name nationality minChapterPage
1 J K Rowling UK 23
2 Brandon Sanderson US 12
3 George R R Martin US 30

Because Rowling's oldest book is Harry Potter with a min chapter length of 23; Brandon Sanderson's oldest book is Mistborn with min chapter length of 12; and GRRM's is 30.


Proposed solution:

Here's the solution I've thought of so far that I don't know how to implement. There might be something simpler, too.

First, explode the books column as follows:

authorsDF
    .select("name", "nationality", explode("books") as "book")
id name nationality book.title book.releaseTimestamp book.chapters
1 J K Rowling UK Harry Potter 1023131 chapters array here...
1 J K Rowling UK Fantastic Beasts 1514322 chapters array here...
2 Brandon Sanderson US Mistborn 172322 chapters array here...
2 Brandon Sanderson US A Way of Kings 651231 chapters array here...
3 George R R Martin US A Game of Thrones 812312 chapters array here...

Then, somehow aggregate these rows by author, minimizing by book.releaseTimestamp (I don't know how to do this):

authorsDF
  .select("name", "nationality", explode("books") as "book")
  .mysteryStep(minimize by releaseTimestamp)
id name nationality book.title book.releaseTimestamp book.chapters
1 J K Rowling UK Harry Potter 1023131 chapters array here...
2 Brandon Sanderson US Mistborn 172322 chapters array here...
3 George R R Martin US A Game of Thrones 812312 chapters array here...

Then explode on chapters:

authorsDF
  .select("name", "nationality", explode("books") as "book")
  .mysteryStep(minimize by releaseTimestamp)
  .select("name", "nationality", explode("books.chapters") as "chapter")
id name nationality chapter.chapterTitle chapter.pages
1 J K Rowling UK A 23
1 J K Rowling UK B 31
1 J K Rowling UK C 51
2 Brandon Sanderson US D 178
2 Brandon Sanderson US E 12
2 Brandon Sanderson US F 23
2 Brandon Sanderson US G 4
3 George R R Martin US H 46
3 George R R Martin US I 30

Then, I'd need to aggregate & minimize by chapter.pages again. I don't know how to do this either.

authorsDF
  .select("name", "nationality", explode("books") as "book")
  .mysteryStep(minimize by releaseTimestamp)
  .select("name", "nationality", explode("books.chapters") as "chapter")
  .mysteryStep(minimize by pages)
id name nationality chapter.chapterTitle chapter.pages
1 J K Rowling UK A 23
2 Brandon Sanderson US E 12
3 George R R Martin US I 30

Finally it's a simple step to get the Desired outcome by selecting our desired columns.

How should I do those mystery steps? Alternatively, is there a different way entirely?

CodePudding user response:

You can explode down to chapters, order the resulting dataframe by release date and page count, and then take the first element:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val w = Window.partitionBy("id").orderBy("releaseTimestamp", "pages")
(
  authorsDF
  .withColumn("books", explode(col("books")))
  .select(col("*"), col("books.releaseTimestamp").as("releaseTimestamp"), explode(col("books.chapters")).as("chapter"))
  .select(col("*"), col("chapter.*"))
  .withColumn("rn", row_number.over(w)).where("rn=1").drop("books", "releaseTimestamp", "chapter", "rn")
).show(false)
 --- ----------------- ----------- ------------ ----- 
|id |name             |nationality|chapterTitle|pages|
 --- ----------------- ----------- ------------ ----- 
|1  |J K Rowling      |UK         |A           |23   |
|3  |George R R Martin|US         |I           |30   |
|2  |Brandon Sanderson|US         |E           |12   |
 --- ----------------- ----------- ------------ ----- 
  • Related