My Input looks like below:
val windowSpec1 = Window.partitionBy($"member_id",$"plan_id",$"err_cd").orderBy($"member_id")
val windowSpec2 = Window.partitionBy($"member_id",$"plan_id").orderBy($"err_cd")
val enrollmentData = inputData.select($"member_id", $"plan_id", $"err_cd")
.withColumn("rk", row_number().over(windowSpec1))
.withColumn("error_index", dense_rank().over(windowSpec2))
--------- ------------------ ------ --- -----------
|member_id| plan_id|err_cd| rk|error_index|
--------- ------------------ ------ --- -----------
| M0002| 12345MH22220| EH044| 1| 1|
| M0002| 12345MH22220| EP049| 1| 2|
| M0003| 12345MH33330| EP051| 1| 1|
| M0003| 12345MH33330| EP053| 1| 2|
| M0003| 12345MH33330| EP054| 1| 3|
| M0003| 12345MH44440| EP054| 1| 1|
--------- ------------------ ------ --- -----------
Required output:
My error_codes
column in the output dataset is a Seq of strings. I need to make an array, can change Seq if not suited.
--------- ------------------ -----------------
|member_id| plan_id|error_codes |
--------- ------------------ -----------------
| M0002| 12345MH22220|EH044,EP049 |
| M0003| 12345MH33330|EP051,EP053,EP054|
| M0003| 12345MH44440|EP054 |
--------- ------------------ -----------------
Please let me know if you have any suggestions.
CodePudding user response:
The functions collect_list, array_distinct and array_sort should help.
See the example below
import org.apache.spark.sql.functions.{array_distinct, array_sort, collect_list, collect_set}
case class WithArray(name: String, descr: String, values: Seq[Int])
case class RawData(name: String, descr: String, value:Int)
class ArrayTrials extends BaseSpec {
describe("let's play with arrays") {
it("here we go") {
import spark.implicits._
val data = Seq(
RawData("one", "the first one", 1),
RawData("two", "the second one", 1),
RawData("two", "the second one", 2),
RawData("three", "the third one", 10),
RawData("three", "the third one", 20),
RawData("three", "the third one", 20),
RawData("three", "the third one", 30)).toDS
val expected = Seq(WithArray("one", "the first one", Seq(1)),
WithArray("two", "the second one", Seq(1, 2)),
WithArray("three", "the third one", Seq(10, 20, 30)))
val results = data.groupBy($"name", $"descr")
.agg(array_sort(array_distinct(collect_list($"value"))).as("values"))
.as[WithArray]
results.collect should contain theSameElementsAs expected
}
}
}