Home > front end >  (spark scala) How to remove nulls in all colums of dataframe and substitue with default values
(spark scala) How to remove nulls in all colums of dataframe and substitue with default values

Time:08-28

I am getting a dataframe which when printed is as follows. Essentially its Array[String] data types and at times in database we have arrays of nulls.

 ---------- 
|newAddress|
 ---------- 
|      null|
|      null|
|      null|
|      null|
|      null|
|      null|
|      [,,]|
|      [,,]|
|      [,,]|
|      [,,]|
|      null|
|      [,,]|
|      [,,]|
|      [,,]|
|      [,,]|
|      [,,]|
|      [,,]|
|      [,,]|
|      [,,]|
 ---------- 

So I want to write a UDF which scans all columns of the dataframe and if the datatype is an array (of any type); then scans through the array and removes the nulls. If this can be generically built without requiring taking the column names etc -- it will be great

any thoughts?

CodePudding user response:

DataFrame has dtypes method, which returns column names along with their data types: Array[("Column name", "Data Type")].

You can map this array, applying different expressions to each column, based on their data type. And you can then pass this mapped list to the select method:

val df = Seq((1,List[Integer](1,2,null))).toDF
 --- ------------ 
| _1|          _2|
 --- ------------ 
|  1|[1, 2, null]|
 --- ------------ 

df.dtypes
// Array[(String, String)] = Array((_1,IntegerType), (_2,ArrayType(IntegerType,true)))

val cols = 
  df.dtypes.map{
    case (c, t) if t.startsWith("ArrayType") => filter(col(c), x => x.isNotNull).as(c)
    case (c, _) => col(c)
  }

df.select(cols:_*).show
 --- ------ 
| _1|    _2|
 --- ------ 
|  1|[1, 2]|
 --- ------ 

CodePudding user response:

You can iterate over the schema of the dataframe and use spark sql built-in functions to filter the array columns:

import org.apache.spark.sql.functions.{filter, col}

// sample data
import spark.implicits._
val data = Seq((5,List[Integer](1,2,null), List[String](null, null, "a"))).toDF

// get the array columns out of the schema and filter the null values
val dataWithoutNulls = data.schema.filter(col => col.dataType.typeName === "array").map(_.name).foldLeft(data) { (df, colName) =>
  df.withColumn(colName, filter(col(colName), c => c.isNotNull))
}
dataWithoutNulls.show()

//     --- ------ --- 
//    | _1|    _2| _3|
//     --- ------ --- 
//    |  5|[1, 2]|[a]|
//     --- ------ --- 
  • Related