Home > OS >  Forward filling pyspark dataframe based on previous values
Forward filling pyspark dataframe based on previous values

Time:09-30

I have two spark dataframes which will be full outer joined.

df1 = spark.createDataFrame(pd.DataFrame([[1,5,[1,2]],[1,15,[1,3]],[2,4,[3,4]]], 
                             columns=["id","day","state"]))
df2 = spark.createDataFrame(pd.DataFrame([[1,10,[5,6]],[1,12,[7]],[2,4,[3]],
                            [2,6,[10,12]],[2,10,[8,9]]], columns=["id","day","action"]))
df1.join(df2, on=["id","day"], how="fullouter").orderBy("id","day").show()

The resulting output is shown below

 --- --- ------ -------- 
| id|day| state|  action|
 --- --- ------ -------- 
|  1|  5|[1, 2]|    null|
|  1| 10|  null|  [5, 6]|
|  1| 12|  null|     [7]|
|  1| 15|[1, 3]|    null|
|  2|  4|[3, 4]|     [3]|
|  2|  6|  null|[10, 12]|
|  2| 10|  null|  [8, 9]|
 --- --- ------ -------- 

I need the output to look like what's shown below, i.e. the last state within the same id needs to be copied down. And the null actions replaced by [0]. Both these dataframes are very large.

 --- --- ------ -------- 
| id|day| state|  action|
 --- --- ------ -------- 
|  1|  5|[1, 2]|     [0]|
|  1| 10|[1, 2]|  [5, 6]|
|  1| 12|[1, 2]|     [7]|
|  1| 15|[1, 3]|     [0]|
|  2|  4|[3, 4]|     [3]|
|  2|  6|[3, 4]|[10, 12]|
|  2| 10|[3, 4]|  [8, 9]|
 --- --- ------ -------- 

CodePudding user response:

You can use Pyspark's last function to retrieve the last value within a window; ignorenulls=True is a needed argument to retrieve the last non-null value.
To fill NA in the array column, instead, we need to use the same method proposed in this answer since .fillna does not support arrays.

import pyspark.sql.functions as F
from pyspark.sql.window import Window

# window to determine last value
w = Window.partitionBy('id').orderBy('day').rangeBetween(Window.unboundedPreceding, 0)

# `df` is your outer-joined dataframe
(df
 .withColumn('state', F.last('state', ignorenulls=True).over(w))
 .withColumn('action', F.when(F.col('action').isNull(), F.array(F.lit(0))).otherwise(F.col('action')))
).show()

 --- --- ------ -------- 
| id|day| state|  action|
 --- --- ------ -------- 
|  1|  5|[1, 2]|     [0]|
|  1| 10|[1, 2]|  [5, 6]|
|  1| 12|[1, 2]|     [7]|
|  1| 15|[1, 3]|     [0]|
|  2|  4|[3, 4]|     [3]|
|  2|  6|[3, 4]|[10, 12]|
|  2| 10|[3, 4]|  [8, 9]|
 --- --- ------ -------- 
  • Related