I have a dataframe as below
----- ------
|empID|deptid|
----- ------
| 163| null|
| 843| null|
----- ------
And I have and Array of Long number range = [20,21]
or you can convert it to another dataframe as
--------
| deptid|
--------
| 20|
| 21|
--------
I want to replace those null values and get the output as
----- ------
|empID|deptid|
----- ------
| 163| 20|
| 843| 21|
----- ------
I tried
emp.na.fill(range, Array("deptid")
But na.fill needs the first parameter to be String Boolean Long double etc, and not an array.
I tried to join two dataframes to get the output but non of the joins give me the solution.
Everything I try gives me
----- ------
|empID|deptid|
----- ------
| 163| null|
| 843| null|
| null| 5|
| null| 6|
----- ------
Any idea?
Edit: It does not matter which empId get's which value from range array.
And yes I have made sure the array length matches the number of nulls.
CodePudding user response:
So I do have a solution for this but it's so convoluted and hacky that I still want someone to give a better answer.
Basically join two dataframes, collect all data as array zip (removes all null values) and then explode as mapped values.
val p = newRecords.as("L")
.join(range.as("R"), newRecords("deptid") =!= range("deptid"), "full")
.groupBy()
.agg(collect_list($"R.deptid").as("D"), collect_list($"empID").as("E"))
.withColumn("zip",arrays_zip(col("D"),col("E"))).drop("D", "E")
.select(explode($"zip").as("zip1"))
.withColumn("empid", $"zip1".getItem("E"))
.withColumn("deptid", $"zip1".getItem("D"))
.drop($"zip1")
.show()
----- ------
|empid|deptid|
----- ------
| 163| 20|
| 843| 21|
----- ------
CodePudding user response:
You can do it with or without converting the array into a dataframe. Might be worth trying out both to see which is more efficient (usually udfs will be slower).
If you don't convert the array into a dataframe, you can use a udf. You'd generate row numbers for the initial dataframe and use it as an index into the array:
val testDF = Seq((163, null), (843, null)).toDF("empID", "deptID")
val range = List(20, 21)
val myUDF = udf((i: Int) => {
range(i)
})
testDF.withColumn("rn", row_number().over(Window.orderBy("empID")) - 1)
.withColumn("deptID", myUDF(col("rn")))
.select("empID", "deptID").show
Output:
----- ------
|empID|deptID|
----- ------
| 163| 20|
| 843| 21|
----- ------
If you do convert the array into a dataframe, you can generate row numbers for both the dataframes and use that in your join condition:
val testDF = Seq((163, null), (843, null)).toDF("empID", "deptID")
val rangeDF = Seq(20, 21).toDF("deptID")
testDF.withColumn("rn", row_number().over(Window.orderBy("empID")))
.join(rangeDF.withColumn("rn", row_number().over(Window.orderBy("deptID"))), Seq("rn"))
.select(testDF.col("empID"), rangeDF.col("deptID")).show
Output:
----- ------
|empID|deptID|
----- ------
| 163| 20|
| 843| 21|
----- ------
Note that both these solutions involve using a Window
function without a partition defined. This could be bad if you have a lot of data and it end up getting shuffled to a single node.