I have a DataFrame in Apache Spark SQL, where I want to drop all columns where all values are NULL.
An example:
------- ---- ------ ------
| name| age|height|weight|
------- ---- ------ ------
| JUAN| 23| null| null|
| JAVIER|null| null| null|
| PABLO| 22| null| null|
| SERGIO|null| null| null|
|RODRIGO|null| null| null|
------- ---- ------ ------
And I want to have a new DataFrame without the columns with all values on NULL, so the output would be:
------- ----
| name| age|
------- ----
| JUAN| 23|
| JAVIER|null|
| PABLO| 22|
| SERGIO|null|
|RODRIGO|null|
------- ----
How can I do it on Apache Spark SQL? Im working on Java.
CodePudding user response:
There is no built-in function to do this. You can count the not null values for all columns first. scala demo:
// run a job to count the not null values for all columns
import org.apache.spark.sql.functions._
val statistics = df.columns.map(c => expr(s"count(IF($c is not null, 1, null))").as(c "_notnullcount"))
val statisticsResult = df.select(statistics: _*).collectAsList().get(0)
// keep only the columns that notnullcount > 0
val newCols = df.columns.filter(c => statisticsResult.getAs[Long](c "_notnullcount") > 0).map(col)
println(newCols.mkString(","))
df.select(newCols: _*).show
CodePudding user response:
You can build a null-check condition with min
& max
against the columns & later transpose the map created at the column level to filter out non-null columns as below -
The approach is to check if the Min & Max Values are not Null , as well as check for mutual exclusivity for further null safe check
Data Preparation
s = StringIO("""
name,age,height,weight
JUAN,23,,
JAVIER,,,
PABLO,22,,
SERGIO,,,
RODRIGO,,,
""")
df = pd.read_csv(s,delimiter=',')
sparkDF = sql.createDataFrame(df)
sparkDF.show()
------- ---- ------ ------
| name| age|height|weight|
------- ---- ------ ------
| JUAN|23.0| NaN| NaN|
| JAVIER| NaN| NaN| NaN|
| PABLO|22.0| NaN| NaN|
| SERGIO| NaN| NaN| NaN|
|RODRIGO| NaN| NaN| NaN|
------- ---- ------ ------
Null Check Condition - Min & Max
null_map = sparkDF.select(*[(
(
F.min(F.col(c)).isNull()
& F.max(F.col(c)).isNull()
)
|( F.min(F.col(c)) == F.max(F.col(c)) )
).alias(c)
for c in sparkDF.columns
]
).toPandas().T.reset_index().rename(columns={0:'null_flag'})
sql.createDataFrame(null_map).show()
------ ---------
| index|null_flag|
------ ---------
| name| false|
| age| false|
|height| true|
|weight| true|
------ ---------
Filtering Non Null Columns
non_null_columns = null_map[null_map['null_flag'] == False]['index'].values
sparkDF.select(*non_null_columns).show()
------- ----
| name| age|
------- ----
| JUAN|23.0|
| JAVIER| NaN|
| PABLO|22.0|
| SERGIO| NaN|
|RODRIGO| NaN|
------- ----