Home > Enterprise >  Spark Scala: How to replace null with values from an array or another dataframe
Spark Scala: How to replace null with values from an array or another dataframe

Time:06-02

I have a dataframe as below

 ----- ------ 
|empID|deptid|
 ----- ------ 
|  163|  null|
|  843|  null|
 ----- ------ 

And I have and Array of Long number range = [20,21] or you can convert it to another dataframe as

 -------- 
|  deptid|
 -------- 
|      20|
|      21|
 -------- 

I want to replace those null values and get the output as

 ----- ------ 
|empID|deptid|
 ----- ------ 
|  163|    20|
|  843|    21|
 ----- ------ 

I tried

emp.na.fill(range, Array("deptid")

But na.fill needs the first parameter to be String Boolean Long double etc, and not an array.

I tried to join two dataframes to get the output but non of the joins give me the solution.

Everything I try gives me

 ----- ------ 
|empID|deptid|
 ----- ------ 
|  163|  null|
|  843|  null|
| null|     5|
| null|     6|
 ----- ------ 

Any idea?

Edit: It does not matter which empId get's which value from range array.

And yes I have made sure the array length matches the number of nulls.

CodePudding user response:

So I do have a solution for this but it's so convoluted and hacky that I still want someone to give a better answer.

Basically join two dataframes, collect all data as array zip (removes all null values) and then explode as mapped values.

val p = newRecords.as("L")
.join(range.as("R"), newRecords("deptid") =!= range("deptid"), "full")
.groupBy()
.agg(collect_list($"R.deptid").as("D"), collect_list($"empID").as("E"))
.withColumn("zip",arrays_zip(col("D"),col("E"))).drop("D", "E")
.select(explode($"zip").as("zip1"))
.withColumn("empid", $"zip1".getItem("E"))
.withColumn("deptid", $"zip1".getItem("D"))
.drop($"zip1")
.show()

 ----- ------ 
|empid|deptid|
 ----- ------ 
|  163|    20|
|  843|    21|
 ----- ------ 

CodePudding user response:

You can do it with or without converting the array into a dataframe. Might be worth trying out both to see which is more efficient (usually udfs will be slower).

If you don't convert the array into a dataframe, you can use a udf. You'd generate row numbers for the initial dataframe and use it as an index into the array:

val testDF = Seq((163, null), (843, null)).toDF("empID", "deptID")
val range = List(20, 21)

val myUDF = udf((i: Int) => {
    range(i)
})

testDF.withColumn("rn", row_number().over(Window.orderBy("empID")) - 1)
.withColumn("deptID", myUDF(col("rn")))
.select("empID", "deptID").show

Output:

 ----- ------ 
|empID|deptID|
 ----- ------ 
|  163|    20|
|  843|    21|
 ----- ------ 

If you do convert the array into a dataframe, you can generate row numbers for both the dataframes and use that in your join condition:

val testDF = Seq((163, null), (843, null)).toDF("empID", "deptID")
val rangeDF = Seq(20, 21).toDF("deptID")

testDF.withColumn("rn", row_number().over(Window.orderBy("empID")))
.join(rangeDF.withColumn("rn", row_number().over(Window.orderBy("deptID"))), Seq("rn"))
.select(testDF.col("empID"), rangeDF.col("deptID")).show

Output:

 ----- ------ 
|empID|deptID|
 ----- ------ 
|  163|    20|
|  843|    21|
 ----- ------ 

Note that both these solutions involve using a Window function without a partition defined. This could be bad if you have a lot of data and it end up getting shuffled to a single node.

  • Related