Let's say I have Authors who write Books (column of Seq[Book]
), and each Book has Chapters (column of Seq[Chapter]
. I want to have a table of Authors, each row representing an author, with a column indicating the lowest chapter page count they've written in their oldest book. If the data were to look like, just as an example:
case class Chapter(chapterTitle: String, pages: Int)
case class Book(title: String, releaseTimestamp: BigInt, chapters: Seq[Chapter])
case class Author(id: Int, name: String, nationality: String, books: Seq[Book])
val chapter1 = Chapter(chapterTitle="A", pages=23)
val chapter2 = Chapter(chapterTitle="B", pages=31)
val chapter3 = Chapter(chapterTitle="C", pages=51)
val chapter4 = Chapter(chapterTitle="D", pages=178)
val chapter5 = Chapter(chapterTitle="E", pages=12)
val chapter6 = Chapter(chapterTitle="F", pages=23)
val chapter7 = Chapter(chapterTitle="G", pages=4)
val chapter8 = Chapter(chapterTitle="H", pages=46)
val chapter9 = Chapter(chapterTitle="I", pages=30)
val book1 = Book(title="Harry Potter", releaseTimestamp=1023131, chapters=Seq(chapter1, chapter2))
val book2 = Book(title="Fantastic Beasts", releaseTimestamp=1514322, chapters=Seq(chapter3))
val book3 = Book(title="Mistborn", releaseTimestamp=172322, chapters=Seq(chapter4, chapter5))
val book4 = Book(title="The Way of Kings", releaseTimestamp=651231, chapters=Seq(chapter6, chapter7))
val book5 = Book(title="A Game of Thrones", releaseTimestamp=812312, chapters=Seq(chapter8, chapter9))
val author1 = Author(id=1, name="J K Rowling", nationality="UK", books=Seq(book1, book2))
val author2 = Author(id=2, name="Brandon Sanderson", nationality="US", books=Seq(book3, book4))
val author3 = Author(id=3, name="George R R Martin", nationality="US", books=Seq(book5))
val table = Seq(author1, author2, author3)
val authorsDF = table.toDF()
authorsDF
would then look like
id | name | nationality | books |
---|---|---|---|
1 | J K Rowling | UK | books array here... |
2 | Brandon Sanderson | US | books array here... |
3 | George R R Martin | US | books array here... |
Desired outcome:
I want to be able to do queries on the table authorsDF
so it shows the minimum page something like:
id | name | nationality | minChapterPage |
---|---|---|---|
1 | J K Rowling | UK | 23 |
2 | Brandon Sanderson | US | 12 |
3 | George R R Martin | US | 30 |
Because Rowling's oldest book is Harry Potter
with a min chapter length of 23; Brandon Sanderson's oldest book is Mistborn
with min chapter length of 12; and GRRM's is 30.
Proposed solution:
Here's the solution I've thought of so far that I don't know how to implement. There might be something simpler, too.
First, explode the books
column as follows:
authorsDF
.select("name", "nationality", explode("books") as "book")
id | name | nationality | book.title | book.releaseTimestamp | book.chapters |
---|---|---|---|---|---|
1 | J K Rowling | UK | Harry Potter | 1023131 | chapters array here... |
1 | J K Rowling | UK | Fantastic Beasts | 1514322 | chapters array here... |
2 | Brandon Sanderson | US | Mistborn | 172322 | chapters array here... |
2 | Brandon Sanderson | US | A Way of Kings | 651231 | chapters array here... |
3 | George R R Martin | US | A Game of Thrones | 812312 | chapters array here... |
Then, somehow aggregate these rows by author, minimizing by book.releaseTimestamp
(I don't know how to do this):
authorsDF
.select("name", "nationality", explode("books") as "book")
.mysteryStep(minimize by releaseTimestamp)
id | name | nationality | book.title | book.releaseTimestamp | book.chapters |
---|---|---|---|---|---|
1 | J K Rowling | UK | Harry Potter | 1023131 | chapters array here... |
2 | Brandon Sanderson | US | Mistborn | 172322 | chapters array here... |
3 | George R R Martin | US | A Game of Thrones | 812312 | chapters array here... |
Then explode on chapters
:
authorsDF
.select("name", "nationality", explode("books") as "book")
.mysteryStep(minimize by releaseTimestamp)
.select("name", "nationality", explode("books.chapters") as "chapter")
id | name | nationality | chapter.chapterTitle | chapter.pages |
---|---|---|---|---|
1 | J K Rowling | UK | A | 23 |
1 | J K Rowling | UK | B | 31 |
1 | J K Rowling | UK | C | 51 |
2 | Brandon Sanderson | US | D | 178 |
2 | Brandon Sanderson | US | E | 12 |
2 | Brandon Sanderson | US | F | 23 |
2 | Brandon Sanderson | US | G | 4 |
3 | George R R Martin | US | H | 46 |
3 | George R R Martin | US | I | 30 |
Then, I'd need to aggregate & minimize by chapter.pages
again. I don't know how to do this either.
authorsDF
.select("name", "nationality", explode("books") as "book")
.mysteryStep(minimize by releaseTimestamp)
.select("name", "nationality", explode("books.chapters") as "chapter")
.mysteryStep(minimize by pages)
id | name | nationality | chapter.chapterTitle | chapter.pages |
---|---|---|---|---|
1 | J K Rowling | UK | A | 23 |
2 | Brandon Sanderson | US | E | 12 |
3 | George R R Martin | US | I | 30 |
Finally it's a simple step to get the Desired outcome by selecting our desired columns.
How should I do those mystery steps? Alternatively, is there a different way entirely?
CodePudding user response:
You can explode down to chapters, order the resulting dataframe by release date and page count, and then take the first element:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy("id").orderBy("releaseTimestamp", "pages")
(
authorsDF
.withColumn("books", explode(col("books")))
.select(col("*"), col("books.releaseTimestamp").as("releaseTimestamp"), explode(col("books.chapters")).as("chapter"))
.select(col("*"), col("chapter.*"))
.withColumn("rn", row_number.over(w)).where("rn=1").drop("books", "releaseTimestamp", "chapter", "rn")
).show(false)
--- ----------------- ----------- ------------ -----
|id |name |nationality|chapterTitle|pages|
--- ----------------- ----------- ------------ -----
|1 |J K Rowling |UK |A |23 |
|3 |George R R Martin|US |I |30 |
|2 |Brandon Sanderson|US |E |12 |
--- ----------------- ----------- ------------ -----