Home > OS >  Aggregate row objects in Spark
Aggregate row objects in Spark

Time:09-22

I have a table like below:

| id | item                                 |
| -- | ------------------------------------ |
| 1  | {order_id: 1, item_id: 1, price: 10} |
| 2  | {order_id: 1, item_id: 2, price: 11} |
| 3  | {order_id: 2, item_id: 3, price: 12} |
| 4  | {order_id: 2, item_id: 4, price: 13} |

I need to aggregate the rows in the table into the following:

| order_id | order                                                                    |
| -------- | ------------------------------------------------------------------------ |
| 1        | {order_id: 1, items: [{item_id: 1, price: 10}, {item_id: 2, price: 11}]} |
| 2        | {order_id: 2, items: [{item_id: 3, price: 12}, {item_id: 4, price: 13}]} |

Initially I thought UDAF could do the trick, but when I implement an aggregator UDAF function, I'm not sure what to return in merge method as if order id is different, they can't be merged.

CodePudding user response:

From Spark 1.6 and greater, you don't need an UDAF, you can aggregate row objects using build-in SQL function collect_list

If your table schema is as follow:

root
 |-- id: integer (nullable = false)
 |-- item: struct (nullable = true)
 |    |-- order_id: integer (nullable = true)
 |    |-- item_id: integer (nullable = true)
 |    |-- price: double (nullable = true)

After loading your table in dataframe, your code should be (in scala):

import org.apache.spark.sql.functions.{collect_list, struct}

dataframe
  .groupBy("item.order_id")
  .agg(collect_list(struct("item.item_id", "item.price")).as("items"))
  .withColumn("order", struct("order_id", "items"))
  .drop("items")

CodePudding user response:

Assuming the following model:

case class Order(order_id: Int, items: Seq[Item])
case class Item(item_id: Int, price: Double)
case class Line(item: Item)

Use groupBy to group rows by item.order_id and then collect the items:

import sparkSession.implicits._

df.groupBy($"item.order_id")
  .as[Int, Line]
  .mapGroups { case (order_id, lines) =>
    (order_id, Order(order_id, lines.toSeq.map(line => Item(line.item.item_id, line.item.price))))
  }
  • Related