I have a dataframe that will get filtered (not null) on multiple columns. These columns are coming from a config file.
Lets say I have a config file that has filterCols: COLUMN_1,COLUMN_2...,COLUMN_10
.
In my code I can hardcode the column names like:
df = dataDF.filter(~col("COLUMN_1").isNull() &
~col("COLUMN_2").isNull() &
......
~col("COLUMN_10").isNull())
But these columns can be removed, as and when required, or more columns can be added as well.
If I follow the above approach, I will have to change and deploy the code every time.
Is there a way where I can loop over these columns and then filter?
I tried something like this:
colList = ['COLUMN_1', 'COLUMN_2', ...,'COLUMN_10']
df = dataDF
for name in colList:
df = df.filter(~col(name).isNull())
But df
is showing zero records.
CodePudding user response:
You could generate a query string based on your columns and use SparkSQL.
Example:
spark = SparkSession.builder.getOrCreate()
data = [
{"a": 1, "b": 2, "c": 3},
{"a": 1, "b": 2, "c": 3},
{"a": 1, "b": 2, "c": 3, "d": 4},
]
df = spark.createDataFrame(data)
columns = ["a", "b", "c", "d"]
df.createTempView("table")
df = spark.sql(
"SELECT * FROM table WHERE {}".format(
" AND ".join(x " IS NOT NULL" for x in columns)
)
)
Result:
--- --- --- ---
|a |b |c |d |
--- --- --- ---
|1 |2 |3 |4 |
--- --- --- ---
CodePudding user response:
Use python functools.reduce
to chain multiple conditions:
from functools import reduce
import pyspark.sql.functions as F
filter_expr = reduce(lambda a, b: a & b, [F.col(c).isNotNull() for c in colList])
df = df.filter(filter_expr)