Home > Software engineering >  Replicate a function from pandas into pyspark
Replicate a function from pandas into pyspark

Time:08-11

I am trying to execute the same function on a spark dataframe rather than pandas.

def check_value(df):
    lista=[]
    for index,value in enumerate(df.columns):
        lista.append('Column name: db_transactions/{} has {} % of white_space charachter and {} nullvalues'.format(df.columns[index],sum(list(map(lambda x: str(x).isspace(),df[value])))/df.shape[0],pd.isna(df[value]).sum()))
    return lista

CodePudding user response:

A direct translation would require you to do multiple collect for each column calculation. I suggest you do all calculations for columns in the dataframe as a single row and then collect that row. Here's an example.

# input dataframe, say `data_sdf`
# the blank values can have none or multiple whitespaces - ' ', '', '    ', etc.
#  -------- -------- 
# |  chars1|  chars2|
#  -------- -------- 
# |        |        |
# |        |        |
# |  blah  |  blah  |
# |    blah|    blah|
# |  blah  |  blah  |
# |    blah|    blah|
# |    null|        |
#  -------- -------- 

Calculate percentage of whitespace values and number of null values for all columns.

calc_sdf = data_sdf. \
    select(*[(func.sum(func.trim(func.col(colname)).like('').cast('int')) / func.count('*')).alias(colname '_wspace') for colname in data_sdf.columns],
           *[func.sum(func.col(colname).isNull().cast('int')).alias(colname '_null') for colname in data_sdf.columns]
           )

#  ------------------ ------------------- ----------- ----------- 
# |     chars1_wspace|      chars2_wspace|chars1_null|chars2_null|
#  ------------------ ------------------- ----------- ----------- 
# |0.2857142857142857|0.42857142857142855|          1|          0|
#  ------------------ ------------------- ----------- ----------- 

We can convert the calculated fields as a dictionary for easy use in the lista creation.

calc_dict = calc_sdf.rdd. \
    map(lambda k: k.asDict()). \
    collect()[0]

# {'chars1_null': 1,
#   'chars1_wspace': 0.2857142857142857,
#   'chars2_null': 0,
#   'chars2_wspace': 0.42857142857142855}

Use the calc_dict in the lista creation.

lista = []

for colname in data_sdf.columns:
    lista.append('Column name: db_transactions/{} has {} % of white_space character and {} nullvalues'.format(colname, 
                                                                                                              round(data_dict[colname '_wspace'] * 100, 2), 
                                                                                                              data_dict[colname '_null']
                                                                                                              )
                 )

# ['Column name: db_transactions/chars1 has 28.57 % of white_space character and 1 nullvalues',
#  'Column name: db_transactions/chars2 has 42.86 % of white_space character and 0 nullvalues']
  • Related