Home > front end >  How I discard columns with all values NULL in Apache Spark?
How I discard columns with all values NULL in Apache Spark?

Time:07-22

I have a DataFrame in Apache Spark SQL, where I want to drop all columns where all values are NULL.

An example:

 ------- ---- ------ ------ 
|   name| age|height|weight|
 ------- ---- ------ ------ 
|   JUAN|  23|  null|  null|
| JAVIER|null|  null|  null|
|  PABLO|  22|  null|  null|
| SERGIO|null|  null|  null|
|RODRIGO|null|  null|  null|
 ------- ---- ------ ------ 

And I want to have a new DataFrame without the columns with all values on NULL, so the output would be:

 ------- ---- 
|   name| age|
 ------- ---- 
|   JUAN|  23|
| JAVIER|null|
|  PABLO|  22|
| SERGIO|null|
|RODRIGO|null|
 ------- ---- 

How can I do it on Apache Spark SQL? Im working on Java.

CodePudding user response:

There is no built-in function to do this. You can count the not null values for all columns first. scala demo:

// run a job to count the not null values for all columns
    import org.apache.spark.sql.functions._
    val statistics = df.columns.map(c => expr(s"count(IF($c is not null, 1, null))").as(c   "_notnullcount"))
    val statisticsResult = df.select(statistics: _*).collectAsList().get(0)
// keep only the columns that notnullcount > 0 
    val newCols = df.columns.filter(c => statisticsResult.getAs[Long](c   "_notnullcount") > 0).map(col)
    println(newCols.mkString(","))
    df.select(newCols: _*).show

CodePudding user response:

You can build a null-check condition with min & max against the columns & later transpose the map created at the column level to filter out non-null columns as below -

The approach is to check if the Min & Max Values are not Null , as well as check for mutual exclusivity for further null safe check

Data Preparation

s = StringIO("""
name,age,height,weight
JUAN,23,,
JAVIER,,,
PABLO,22,,
SERGIO,,,
RODRIGO,,,
""")

df = pd.read_csv(s,delimiter=',')

sparkDF = sql.createDataFrame(df)

sparkDF.show()

 ------- ---- ------ ------ 
|   name| age|height|weight|
 ------- ---- ------ ------ 
|   JUAN|23.0|   NaN|   NaN|
| JAVIER| NaN|   NaN|   NaN|
|  PABLO|22.0|   NaN|   NaN|
| SERGIO| NaN|   NaN|   NaN|
|RODRIGO| NaN|   NaN|   NaN|
 ------- ---- ------ ------ 

Null Check Condition - Min & Max

null_map = sparkDF.select(*[(
                 (
                    F.min(F.col(c)).isNull() 
                  & F.max(F.col(c)).isNull()
                 )
                |( F.min(F.col(c)) == F.max(F.col(c)) )
                ).alias(c)
                 for c in sparkDF.columns
                ]
              ).toPandas().T.reset_index().rename(columns={0:'null_flag'})

sql.createDataFrame(null_map).show()

 ------ --------- 
| index|null_flag|
 ------ --------- 
|  name|    false|
|   age|    false|
|height|     true|
|weight|     true|
 ------ --------- 

Filtering Non Null Columns

non_null_columns = null_map[null_map['null_flag'] == False]['index'].values


sparkDF.select(*non_null_columns).show()

 ------- ---- 
|   name| age|
 ------- ---- 
|   JUAN|23.0|
| JAVIER| NaN|
|  PABLO|22.0|
| SERGIO| NaN|
|RODRIGO| NaN|
 ------- ---- 
  • Related