Home > Mobile >  How to do Forward fill in Pyspark on multiple columns
How to do Forward fill in Pyspark on multiple columns

Time:12-10

I want to do the forwad fill in Pyspark on multiple columns. and if the start value of column is "NaN" then replace that with 0. Below is my DF looks like.

start_timestamp Column1 Column2 Column3 Column4
2020-11-02 08:51:50 2 null null null
2020-11-02 09:14:29 null null null 40
2020-11-02 09:18:32 null 4 2 null
2020-11-02 09:32:42 4 null null null
2020-11-03 13:06:03 null null null 20
2020-11-03 13:10:01 6 null 4 null
2020-11-03 13:54:38 null 5 null null
2020-11-03 14:46:25 null null null null
2020-11-03 14:57:31 7 null null 10
2020-11-03 15:07:07 8 7 null null

Expected DF would be:

start_timestamp Column1 Column2 Column3 Column4
2020-11-02 08:51:50 2 0 0 0
2020-11-02 09:14:29 2 0 0 40
2020-11-02 09:18:32 2 4 2 40
2020-11-02 09:32:42 4 4 2 40
2020-11-03 13:06:03 4 4 2 20
2020-11-03 13:10:01 6 4 4 20
2020-11-03 13:54:38 6 5 4 20
2020-11-03 14:46:25 6 5 4 20
2020-11-03 14:57:31 7 5 4 10
2020-11-03 15:07:07 8 7 4 10

Below is the code i tried which i got on stackoverflow:

from pyspark.sql import Window
from pyspark.sql.functions import last,first
from pyspark.sql.functions import col, max as max_, min as min_
import sys

def stringReplaceFunc(x, y):
    return F.when(x != y, x).otherwise(F.lit(None)) # replace with NULL

def forwardFillImputer(df, cols=[], partitioner="start_timestamp", value="null"): 
      for i in cols:
        window = Window\
        .partitionBy(F.month(partitioner))\
        .orderBy(partitioner)\
        .rowsBetween(-sys.maxsize, 0)
        df= df\
        .withColumn(i, stringReplaceFunc(F.col(i), value))
        fill = F.last(df[i], ignorenulls=True).over(window)
        df= df.withColumn(i,  fill)
        return df

df= forwardFillImputer(df, cols=[i for i in df.columns])

code is not functioning, Please let me know what mistake i am doing. Please let me know if there is any alternative solution. Thanks.

CodePudding user response:

In your current code, you should not partition your window by month, and using rowsBetween is useless. You should only have an ordered window per start_timestamp

Moreover, you're not managing the case when there is no last value. You can manage it using coalesce with literal value '0'

Thus your code can be rewritten as follows:

from pyspark.sql import functions as F
from pyspark.sql import Window

def forwardFillImputer(df, cols=[], partitioner='start_timestamp', value='null'):
    for c in cols:
        df = df.withColumn(c, F.when(F.col(c) != value, F.col(c)))
        df = df.withColumn(c, F.coalesce(F.col(c), F.last(c, True).over(Window.orderBy(partitioner)), F.lit('0')))
    return df

df = forwardFillImputer(df, df.columns)

with the following dataframe as df:

 ------------------- ------- ------- ------- ------- 
|start_timestamp    |Column1|Column2|Column3|Column4|
 ------------------- ------- ------- ------- ------- 
|2020-11-02 08:51:50|2      |null   |null   |null   |
|2020-11-02 09:14:29|null   |null   |null   |40     |
|2020-11-02 09:18:32|null   |4      |2      |null   |
|2020-11-02 09:32:42|4      |null   |null   |null   |
|2020-11-03 13:06:03|null   |null   |null   |20     |
|2020-11-03 13:10:01|6      |null   |4      |null   |
|2020-11-03 13:54:38|null   |5      |null   |null   |
|2020-11-03 14:46:25|null   |null   |null   |null   |
|2020-11-03 14:57:31|7      |null   |null   |10     |
|2020-11-03 15:07:07|8      |7      |null   |null   |
 ------------------- ------- ------- ------- ------- 

You get the following output:

 ------------------- ------- ------- ------- ------- 
|start_timestamp    |Column1|Column2|Column3|Column4|
 ------------------- ------- ------- ------- ------- 
|2020-11-02 08:51:50|2      |0      |0      |0      |
|2020-11-02 09:14:29|2      |0      |0      |40     |
|2020-11-02 09:18:32|2      |4      |2      |40     |
|2020-11-02 09:32:42|4      |4      |2      |40     |
|2020-11-03 13:06:03|4      |4      |2      |20     |
|2020-11-03 13:10:01|6      |4      |4      |20     |
|2020-11-03 13:54:38|6      |5      |4      |20     |
|2020-11-03 14:46:25|6      |5      |4      |20     |
|2020-11-03 14:57:31|7      |5      |4      |10     |
|2020-11-03 15:07:07|8      |7      |4      |10     |
 ------------------- ------- ------- ------- ------- 
  • Related