I have 2 data frames
val a = Map(1 -> 2, 3 -> 5).toSeq.toDF("key", "value")
val b = Map(1 -> 4, 2 -> 5).toSeq.toDF("key", "value")
I join them on the column key
a.join(b, "key").show(false)
--- ----- -----
|key|value|value|
--- ----- -----
|1 |2 |4 |
--- ----- -----
I would instead like the result
--- -----
|key|value|
--- -----
|1 |{2, 4}
--- -----
I would like the column value
to be an array that aggragates the values. Is there an idiomatic way to do this? The join
on 2 RDDs does this by default.
CodePudding user response:
To go the fully typed route, you could go the Dataset route:
case class MyKeyValuePair(key: String, value: String)
case class MyOutput(key: String, values: Seq[String])
val a = Map(1 -> 2, 3 -> 5).toSeq.toDF("key", "value").as[MyKeyValuePair]
val b = Map(1 -> 4, 2 -> 5).toSeq.toDF("key", "value").as[MyKeyValuePair]
val output = a.joinWith(b, a("key") === b("key"), "inner").map{
case ( pair1, pair2 ) => MyOutput(pair1.key, Seq(pair1.value, pair2.value))
}
output.show
--- ------
|key|values|
--- ------
| 1|[2, 4]|
--- ------
This gives you fine grained control over what you want your output to exactly look like, and gives you compile-time safety. You just have to define some case classes to type your datasets with.
Hope this helps!