Home > Software design >  Create a PySpark function that determines if two or more selected columns in a dataframe have null v
Create a PySpark function that determines if two or more selected columns in a dataframe have null v

Time:12-13

I want to create a PySpark function that determines if values in two or more columns of a PySpark dataframe are null. A new column should be made to show TRUE or FALSE if values in both columns have been populated. The function should stop processing if it encounters null values in both columns. The output should look like this:

firstname lastnam Values Not in Table
Bob Wilson false
George Johnson false
null Kasey true

^The above would stop processing once it hurts a null or empty value in the table. Thanks in advance!

The code below shows if a value in ONE column of a dataframe is null. I want a new function that does this with 2 or more columns:

def isValueNull(df, column_name, file_name):
    new_df = df.select(df[column_name]).withColumn('Value not in '   file_name,
    F.when((df[column_name] =='') | (df[column_name] == 'Null') | (df[column_name] == 'null') | (df[column_name] == 'NULL'), True).otherwise(False))
    
    return new_df

CodePudding user response:

You can construct the condition expression for each column inside a loop and then use this expression inside withColumn.

Working Example

from pyspark.sql import functions as F
from pyspark.sql import DataFrame
from typing import List
data = [("Bob", "Wilson", ),
        ("George", "Johnson", ), 
        ("null", "Kasey", ),
        ("NULL", "Kasey", ),
        ("Null", "Kasey", ),
        ("", "Kasey", ),
        (None, "Kasey", ),
        ("Bob", None, ),
        ("Bob", "Null", ),
        ("Bob", "", ),
        ("Bob", "null", ),
       ]

df = spark.createDataFrame(data, ("firstname", "lastname", ))

def isValueNull(df: DataFrame, column_names: List[str], file_name: str) -> DataFrame:
    null_condition = F
    for col in column_names:
        null_condition = null_condition.when((df[col] =='') | (F.lower(df[col]) == 'null') | (df[col].isNull()), True)
    new_df = df.withColumn('Value not in '   file_name, null_condition.otherwise(False))
    
    return new_df

isValueNull(df, ["firstname", "lastname"], "Table").show()

Output

--------- -------- ------------------ 
|firstname|lastname|Value not in Table|
 --------- -------- ------------------ 
|      Bob|  Wilson|             false|
|   George| Johnson|             false|
|     null|   Kasey|              true|
|     NULL|   Kasey|              true|
|     Null|   Kasey|              true|
|         |   Kasey|              true|
|     null|   Kasey|              true|
|      Bob|    null|              true|
|      Bob|    Null|              true|
|      Bob|        |              true|
|      Bob|    null|              true|
 --------- -------- ------------------ 

CodePudding user response:

Usually, scenarios like this use the dropna() function provided by PySpark. For example, consider the dataframe created using:

from pyspark.sql import Row
df = sc.parallelize([
        Row(firstname='Bob', lastname='Wilson'),
        Row(firstname='George', lastname='Johnson'),
        Row(firstname=None, lastname='Kasey'),
        Row(firstname='John', lastname=None),
        Row(firstname='John', lastname='Kasey')]).toDF()
df.show()

This would look like this:

 --------- -------- 
|firstname|lastname|
 --------- -------- 
|      Bob|  Wilson|
|   George| Johnson|
|     null|   Kasey|
|     John|    null|
|     John|   Kasey|
 --------- -------- 

Using dropna() on firstname and lastname like this:

df = df.dropna(subset=['firstname', 'lastname'])
df.show()

would produce the output:

 --------- -------- 
|firstname|lastname|
 --------- -------- 
|      Bob|  Wilson|
|   George| Johnson|
|     John|   Kasey|
 --------- -------- 

But in your case, a new column is required as an indicator. I do not think there exists a function that accepts a list of columns and produces the output you desire. That being said, what you are looking for can be achieved using multiple methods. One such method is as follows:

import pyspark.sql.functions as F
columns_list = ['firstname', 'lastname']
new_df = df.withColumn("Values Not in Table", F.lit(False))
for column_name in columns_list:
    new_df = new_df.withColumn('Values Not in Table', F.when(~new_df['Values Not in Table'] & new_df[column_name].isNull(), True).otherwise(new_df['Values Not in Table']))
new_df.show()

This would produce the output:

 --------- -------- ------------------- 
|firstname|lastname|Values Not in Table|
 --------- -------- ------------------- 
|      Bob|  Wilson|              false|
|   George| Johnson|              false|
|     null|   Kasey|               true|
|     John|    null|               true|
|     John|   Kasey|              false|
 --------- -------- ------------------- 

By updating the variable columns_list you can achieve the desired functionality easily.

  • Related