Home > Software engineering >  How to Iterate Dataset column of dense rank to create Array of another column in Scala?
How to Iterate Dataset column of dense rank to create Array of another column in Scala?

Time:07-29

My Input looks like below:

val windowSpec1 = Window.partitionBy($"member_id",$"plan_id",$"err_cd").orderBy($"member_id")
    
val windowSpec2 = Window.partitionBy($"member_id",$"plan_id").orderBy($"err_cd")
    
    
val enrollmentData = inputData.select($"member_id", $"plan_id", $"err_cd")
     .withColumn("rk", row_number().over(windowSpec1))
     .withColumn("error_index", dense_rank().over(windowSpec2))

 --------- ------------------ ------ --- ----------- 
|member_id|           plan_id|err_cd| rk|error_index|
 --------- ------------------ ------ --- ----------- 
|    M0002|      12345MH22220| EH044|  1|          1|
|    M0002|      12345MH22220| EP049|  1|          2|
|    M0003|      12345MH33330| EP051|  1|          1|
|    M0003|      12345MH33330| EP053|  1|          2|
|    M0003|      12345MH33330| EP054|  1|          3|
|    M0003|      12345MH44440| EP054|  1|          1|
 --------- ------------------ ------ --- ----------- 

Required output:

My error_codes column in the output dataset is a Seq of strings. I need to make an array, can change Seq if not suited.

 --------- ------------------ ----------------- 
|member_id|           plan_id|error_codes      |
 --------- ------------------ ----------------- 
|    M0002|      12345MH22220|EH044,EP049      |
|    M0003|      12345MH33330|EP051,EP053,EP054|
|    M0003|      12345MH44440|EP054            |
 --------- ------------------ ----------------- 

Please let me know if you have any suggestions.

CodePudding user response:

The functions collect_list, array_distinct and array_sort should help.

See the example below

import org.apache.spark.sql.functions.{array_distinct, array_sort, collect_list, collect_set}

case class WithArray(name: String, descr: String, values: Seq[Int])

case class RawData(name: String, descr: String, value:Int)

class ArrayTrials extends BaseSpec {
  describe("let's play with arrays") {
    it("here we go") {
    import spark.implicits._
      
    val data = Seq(
      RawData("one", "the first one", 1),
      RawData("two", "the second one", 1),
      RawData("two", "the second one", 2),
      RawData("three", "the third one", 10),
      RawData("three", "the third one", 20),
      RawData("three", "the third one", 20),
      RawData("three", "the third one", 30)).toDS
        
    val expected = Seq(WithArray("one", "the first one", Seq(1)),
      WithArray("two", "the second one", Seq(1, 2)),
      WithArray("three", "the third one", Seq(10, 20, 30)))
        
    val results = data.groupBy($"name", $"descr")
      .agg(array_sort(array_distinct(collect_list($"value"))).as("values"))
      .as[WithArray]
        
    results.collect should contain theSameElementsAs expected
    }
  }
}
  • Related