I have a table like below:
| id | item |
| -- | ------------------------------------ |
| 1 | {order_id: 1, item_id: 1, price: 10} |
| 2 | {order_id: 1, item_id: 2, price: 11} |
| 3 | {order_id: 2, item_id: 3, price: 12} |
| 4 | {order_id: 2, item_id: 4, price: 13} |
I need to aggregate the rows in the table into the following:
| order_id | order |
| -------- | ------------------------------------------------------------------------ |
| 1 | {order_id: 1, items: [{item_id: 1, price: 10}, {item_id: 2, price: 11}]} |
| 2 | {order_id: 2, items: [{item_id: 3, price: 12}, {item_id: 4, price: 13}]} |
Initially I thought UDAF could do the trick, but when I implement an aggregator UDAF function, I'm not sure what to return in merge method as if order id is different, they can't be merged.
CodePudding user response:
From Spark 1.6 and greater, you don't need an UDAF, you can aggregate row objects using build-in SQL function collect_list
If your table schema is as follow:
root
|-- id: integer (nullable = false)
|-- item: struct (nullable = true)
| |-- order_id: integer (nullable = true)
| |-- item_id: integer (nullable = true)
| |-- price: double (nullable = true)
After loading your table in dataframe
, your code should be (in scala):
import org.apache.spark.sql.functions.{collect_list, struct}
dataframe
.groupBy("item.order_id")
.agg(collect_list(struct("item.item_id", "item.price")).as("items"))
.withColumn("order", struct("order_id", "items"))
.drop("items")
CodePudding user response:
Assuming the following model:
case class Order(order_id: Int, items: Seq[Item])
case class Item(item_id: Int, price: Double)
case class Line(item: Item)
Use groupBy
to group rows by item.order_id
and then collect the items:
import sparkSession.implicits._
df.groupBy($"item.order_id")
.as[Int, Line]
.mapGroups { case (order_id, lines) =>
(order_id, Order(order_id, lines.toSeq.map(line => Item(line.item.item_id, line.item.price))))
}