So currently, I have a Spark DataFrame with three column and I'm looking to add a fourth column called target
based on whether three other columns contain null values. For example,
------------- ----------- ----------- -----------
|column_1 |column_2 |column_3 | target |
------------- ----------- ----------- -----------
| 1 | null| null| FAIL |
| null| null| null| PASS |
| null| 1 | null| FAIL |
| null| null| null| PASS |
| null| null| null| PASS |
| null| 12 | null| FAIL |
| null| null| null| PASS |
| null| null| null| PASS |
| null| null| 2 | FAIL |
If column_1, column_2, column_2 are all null I want the value in the target column to be pass, else FAIL. Initially, I thought a UDF or Pandas UDF would do the trick, but from what I understand you should use PySpark function before you use a UDF, because they can be computationally expensive.
Since I'm new to PySpark one of the methods I'm first trying out is df.withColumn("target", col("column_1").isNull()).show(n=10)
. It kinda work. I'm just confused on what the best way to check all three at once and instead the target
column being true or fail. I want to specify the value to be PASS
or FAIL
. Are there any other PySpark function or method I should be aware of to get the resulting target
column I'm looking for. Thanks!
CodePudding user response:
You can use coalesce
for this. coalesce
will return first non-null value from multiple columns. If all are nulls, it will return null.
df = df.withColumn('target', F.when(F.coalesce('column1', 'column2', 'column3').isNull(), 'PASS').otherwise('FAIL'))
CodePudding user response:
You can utilise Higher Order Functions for this -
Data Preparation
s = StringIO("""
column1|column2|column3
1|null|null
null|null|null
null|1|null
null|null|null
null|null|null
null|12|null
null|null|null
null|null|null
null|null|2
""".replace('null',''))
df = pd.read_csv(s,delimiter='|')
sparkDF = sql.createDataFrame(df)\
.withColumn('column1',F.when(F.col('column1') == "NaN",F.lit(None)).otherwise(F.col('column1')))\
.withColumn('column2',F.when(F.col('column2') == "NaN",F.lit(None)).otherwise(F.col('column2')))\
.withColumn('column3',F.when(F.col('column3') == "NaN",F.lit(None)).otherwise(F.col('column3')))
sparkDF.show()
------- ------- -------
|column1|column2|column3|
------- ------- -------
| 1.0| null| null|
| null| null| null|
| null| 1.0| null|
| null| null| null|
| null| null| null|
| null| 12.0| null|
| null| null| null|
| null| null| null|
| null| null| 2.0|
------- ------- -------
ForAll
Transform a combined
column as ArrayType
as higher order functions require an array input
sparkDF = sparkDF.withColumn('combined'
,F.array(*[F.col('column1'),F.col('column2'),F.col('column3')])
)
sparkDF = sparkDF.withColumn('target',F.forall('combined', lambda x: x.isNull()))\
sparkDF.show(truncate=False)
------- ------- ------- ------------------ ------
|column1|column2|column3|combined |target|
------- ------- ------- ------------------ ------
|1.0 |null |null |[1.0, null, null] |false |
|null |null |null |[null, null, null]|true |
|null |1.0 |null |[null, 1.0, null] |false |
|null |null |null |[null, null, null]|true |
|null |null |null |[null, null, null]|true |
|null |12.0 |null |[null, 12.0, null]|false |
|null |null |null |[null, null, null]|true |
|null |null |null |[null, null, null]|true |
|null |null |2.0 |[null, null, 2.0] |false |
------- ------- ------- ------------------ ------