I have a Spark data frame where I need to create a window partition column ("desired_output"). This column needs to back fill and non-null values.
I am looking to backfill the first non-null value and if a non-null does not exist, continue to persist that last non-null value forward.
Here are some use cases and their desired output:
columns = ['user_id', 'date', 'date2', 'desired_outcome']
data = [\
('1','2022-01-01', None, '2022-01-05'),\
('1','2022-01-02', None, '2022-01-05'),\
('1','2022-01-03', None, '2022-01-05'),\
('1','2022-01-04', None, '2022-01-05'),\
('1','2022-01-05', '2022-01-05', '2022-01-05'),\
('1','2022-01-06', None, '2022-01-05'),\
('1','2022-01-07', None, '2022-01-05'),\
('2','2022-01-01', None, '2022-01-05'),\
('2','2022-01-02', None, '2022-01-05'),\
('2','2022-01-03', None, '2022-01-05'),\
('2','2022-01-04', None, '2022-01-05'),\
('2','2022-01-05', '2022-01-05', '2022-01-05'),\
('2','2022-01-06', None, '2022-01-09'),\
('2','2022-01-07', None, '2022-01-09'),\
('2','2022-01-08', None, '2022-01-09'),\
('2','2022-01-09', '2022-01-09', '2022-01-09'),\
('2','2022-01-10', None, '2022-01-09'),\
('2','2022-01-11', None, '2022-01-09'),\
('2','2022-01-12', None, '2022-01-09'),
('3','2022-01-01', '2022-01-01', '2022-01-01'),\
('3','2022-01-02', None, '2022-01-05'),\
('3','2022-01-03', None, '2022-01-05'),\
('3','2022-01-04', None, '2022-01-05'),\
('3','2022-01-05', '2022-01-05', '2022-01-05'),\
('3','2022-01-06', None, '2022-01-05'),\
('3','2022-01-07', None, '2022-01-05'),\
('3','2022-01-08', None, '2022-01-05'),\
('3','2022-01-09', None, '2022-01-05'),\
('3','2022-01-10', None, '2022-01-05'),\
('3','2022-01-11', None, '2022-01-05'),\
('3','2022-01-12', None, '2022-01-05')]
sample_df = spark.createDataFrame(data, columns)
I've tried to following solution but can't quite get the results to return as "desired_output" column.
window = (
Window
.partitionBy('user_id')
.orderBy('date')
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
)
sample_df = sample_df.withColumn('backfill', last('date2', ignorenulls=True).over(window))
CodePudding user response:
You could do it over 2 windows, one looking forward and returning the first non-null value, the other looking backwards and returning the last non-null value.
from pyspark.sql import functions as F, Window as W
w_following = W.partitionBy('user_id').orderBy('date').rowsBetween(0, W.unboundedFollowing)
w_preceding = W.partitionBy('user_id').orderBy('date').rowsBetween(W.unboundedPreceding, 0)
sample_df = sample_df.withColumn(
'date3',
F.coalesce(
F.first('date2', True).over(w_following),
F.last('date2', True).over(w_preceding)
)
)
Result:
sample_df.show(99)
# ------- ---------- ---------- --------------- ----------
# |user_id| date| date2|desired_outcome| date3|
# ------- ---------- ---------- --------------- ----------
# | 1|2022-01-01| null| 2022-01-05|2022-01-05|
# | 1|2022-01-02| null| 2022-01-05|2022-01-05|
# | 1|2022-01-03| null| 2022-01-05|2022-01-05|
# | 1|2022-01-04| null| 2022-01-05|2022-01-05|
# | 1|2022-01-05|2022-01-05| 2022-01-05|2022-01-05|
# | 1|2022-01-06| null| 2022-01-05|2022-01-05|
# | 1|2022-01-07| null| 2022-01-05|2022-01-05|
# | 2|2022-01-01| null| 2022-01-05|2022-01-05|
# | 2|2022-01-02| null| 2022-01-05|2022-01-05|
# | 2|2022-01-03| null| 2022-01-05|2022-01-05|
# | 2|2022-01-04| null| 2022-01-05|2022-01-05|
# | 2|2022-01-05|2022-01-05| 2022-01-05|2022-01-05|
# | 2|2022-01-06| null| 2022-01-09|2022-01-09|
# | 2|2022-01-07| null| 2022-01-09|2022-01-09|
# | 2|2022-01-08| null| 2022-01-09|2022-01-09|
# | 2|2022-01-09|2022-01-09| 2022-01-09|2022-01-09|
# | 2|2022-01-10| null| 2022-01-09|2022-01-09|
# | 2|2022-01-11| null| 2022-01-09|2022-01-09|
# | 2|2022-01-12| null| 2022-01-09|2022-01-09|
# | 3|2022-01-01|2022-01-01| 2022-01-01|2022-01-01|
# | 3|2022-01-02| null| 2022-01-05|2022-01-05|
# | 3|2022-01-03| null| 2022-01-05|2022-01-05|
# | 3|2022-01-04| null| 2022-01-05|2022-01-05|
# | 3|2022-01-05|2022-01-05| 2022-01-05|2022-01-05|
# | 3|2022-01-06| null| 2022-01-05|2022-01-05|
# | 3|2022-01-07| null| 2022-01-05|2022-01-05|
# | 3|2022-01-08| null| 2022-01-05|2022-01-05|
# | 3|2022-01-09| null| 2022-01-05|2022-01-05|
# | 3|2022-01-10| null| 2022-01-05|2022-01-05|
# | 3|2022-01-11| null| 2022-01-05|2022-01-05|
# | 3|2022-01-12| null| 2022-01-05|2022-01-05|
# ------- ---------- ---------- --------------- ----------