I want to do the forwad fill in Pyspark on multiple columns. and if the start value of column is "NaN" then replace that with 0. Below is my DF looks like.
start_timestamp | Column1 | Column2 | Column3 | Column4 |
---|---|---|---|---|
2020-11-02 08:51:50 | 2 | null | null | null |
2020-11-02 09:14:29 | null | null | null | 40 |
2020-11-02 09:18:32 | null | 4 | 2 | null |
2020-11-02 09:32:42 | 4 | null | null | null |
2020-11-03 13:06:03 | null | null | null | 20 |
2020-11-03 13:10:01 | 6 | null | 4 | null |
2020-11-03 13:54:38 | null | 5 | null | null |
2020-11-03 14:46:25 | null | null | null | null |
2020-11-03 14:57:31 | 7 | null | null | 10 |
2020-11-03 15:07:07 | 8 | 7 | null | null |
Expected DF would be:
start_timestamp | Column1 | Column2 | Column3 | Column4 |
---|---|---|---|---|
2020-11-02 08:51:50 | 2 | 0 | 0 | 0 |
2020-11-02 09:14:29 | 2 | 0 | 0 | 40 |
2020-11-02 09:18:32 | 2 | 4 | 2 | 40 |
2020-11-02 09:32:42 | 4 | 4 | 2 | 40 |
2020-11-03 13:06:03 | 4 | 4 | 2 | 20 |
2020-11-03 13:10:01 | 6 | 4 | 4 | 20 |
2020-11-03 13:54:38 | 6 | 5 | 4 | 20 |
2020-11-03 14:46:25 | 6 | 5 | 4 | 20 |
2020-11-03 14:57:31 | 7 | 5 | 4 | 10 |
2020-11-03 15:07:07 | 8 | 7 | 4 | 10 |
Below is the code i tried which i got on stackoverflow:
from pyspark.sql import Window
from pyspark.sql.functions import last,first
from pyspark.sql.functions import col, max as max_, min as min_
import sys
def stringReplaceFunc(x, y):
return F.when(x != y, x).otherwise(F.lit(None)) # replace with NULL
def forwardFillImputer(df, cols=[], partitioner="start_timestamp", value="null"):
for i in cols:
window = Window\
.partitionBy(F.month(partitioner))\
.orderBy(partitioner)\
.rowsBetween(-sys.maxsize, 0)
df= df\
.withColumn(i, stringReplaceFunc(F.col(i), value))
fill = F.last(df[i], ignorenulls=True).over(window)
df= df.withColumn(i, fill)
return df
df= forwardFillImputer(df, cols=[i for i in df.columns])
code is not functioning, Please let me know what mistake i am doing. Please let me know if there is any alternative solution. Thanks.
CodePudding user response:
In your current code, you should not partition your window by month, and using rowsBetween
is useless. You should only have an ordered window per start_timestamp
Moreover, you're not managing the case when there is no last value. You can manage it using coalesce
with literal value '0'
Thus your code can be rewritten as follows:
from pyspark.sql import functions as F
from pyspark.sql import Window
def forwardFillImputer(df, cols=[], partitioner='start_timestamp', value='null'):
for c in cols:
df = df.withColumn(c, F.when(F.col(c) != value, F.col(c)))
df = df.withColumn(c, F.coalesce(F.col(c), F.last(c, True).over(Window.orderBy(partitioner)), F.lit('0')))
return df
df = forwardFillImputer(df, df.columns)
with the following dataframe as df
:
------------------- ------- ------- ------- -------
|start_timestamp |Column1|Column2|Column3|Column4|
------------------- ------- ------- ------- -------
|2020-11-02 08:51:50|2 |null |null |null |
|2020-11-02 09:14:29|null |null |null |40 |
|2020-11-02 09:18:32|null |4 |2 |null |
|2020-11-02 09:32:42|4 |null |null |null |
|2020-11-03 13:06:03|null |null |null |20 |
|2020-11-03 13:10:01|6 |null |4 |null |
|2020-11-03 13:54:38|null |5 |null |null |
|2020-11-03 14:46:25|null |null |null |null |
|2020-11-03 14:57:31|7 |null |null |10 |
|2020-11-03 15:07:07|8 |7 |null |null |
------------------- ------- ------- ------- -------
You get the following output:
------------------- ------- ------- ------- -------
|start_timestamp |Column1|Column2|Column3|Column4|
------------------- ------- ------- ------- -------
|2020-11-02 08:51:50|2 |0 |0 |0 |
|2020-11-02 09:14:29|2 |0 |0 |40 |
|2020-11-02 09:18:32|2 |4 |2 |40 |
|2020-11-02 09:32:42|4 |4 |2 |40 |
|2020-11-03 13:06:03|4 |4 |2 |20 |
|2020-11-03 13:10:01|6 |4 |4 |20 |
|2020-11-03 13:54:38|6 |5 |4 |20 |
|2020-11-03 14:46:25|6 |5 |4 |20 |
|2020-11-03 14:57:31|7 |5 |4 |10 |
|2020-11-03 15:07:07|8 |7 |4 |10 |
------------------- ------- ------- ------- -------