I want the answer in PySpark :
Here i have a DataFrame with column id, date and value
i want to fill the missing date with value 0 and i want every id should have same number of date's.
ex : [ 2022/02/09 to 2022/02/15 ] for all id. [ The min date is 2022/02/09 and max date is 2022/02/15 ]
before :
id | date | value |
---|---|---|
201 | 2022/02/11 | 10 |
201 | 2022/02/13 | 2 |
202 | 2022/02/09 | 50 |
202 | 2022/02/11 | 1 |
202 | 2022/02/12 | 3 |
401 | 2022/02/11 | 12 |
401 | 2022/02/12 | 9 |
401 | 2022/02/15 | 15 |
after :
id | date | value |
---|---|---|
201 | 2022/02/09 | 0 |
201 | 2022/02/10 | 0 |
201 | 2022/02/11 | 10 |
201 | 2022/02/12 | 0 |
201 | 2022/02/13 | 2 |
201 | 2022/02/14 | 0 |
201 | 2022/02/15 | 0 |
202 | 2022/02/09 | 50 |
202 | 2022/02/10 | 0 |
202 | 2022/02/11 | 1 |
202 | 2022/02/12 | 3 |
202 | 2022/02/13 | 0 |
202 | 2022/02/14 | 0 |
202 | 2022/02/15 | 0 |
401 | 2022/02/09 | 0 |
401 | 2022/02/10 | 0 |
401 | 2022/02/11 | 12 |
401 | 2022/02/12 | 9 |
401 | 2022/02/13 | 0 |
401 | 2022/02/14 | 0 |
401 | 2022/02/15 | 15 |
CodePudding user response:
Here's an approach with sequence()
. You first find the min and max dates and use them to create distinct dates. This dates dataframe can then be cross joined with distinct ID values so that all ID values have all dates. The values from the value
field can then be joined to the said cross joined dataframe to fetch all values. The remaining null values can be replaced with 0
.
# convert date column to compatible format in the input dataframe
data_sdf = spark.sparkContext.parallelize(data_ls).toDF(['id', 'dt', 'val']). \
withColumn('dt', func.to_date('dt', 'yyyy/MM/dd'))
# --- ---------- ---
# | id| dt|val|
# --- ---------- ---
# |201|2022-02-11| 10|
# |201|2022-02-13| 2|
# |202|2022-02-09| 50|
# |202|2022-02-11| 1|
# |202|2022-02-12| 3|
# |401|2022-02-11| 12|
# |401|2022-02-12| 9|
# |401|2022-02-15| 15|
# --- ---------- ---
all_dt_sdf = data_sdf. \
select(func.min('dt').alias('min_dt'), func.max('dt').alias('max_dt')). \
withColumn('all_dts', func.expr('sequence(min_dt, max_dt, interval 1 day)')). \
select(func.explode('all_dts').alias('dt'))
# ----------
# | dt|
# ----------
# |2022-02-09|
# |2022-02-10|
# |2022-02-11|
# |2022-02-12|
# |2022-02-13|
# |2022-02-14|
# |2022-02-15|
# ----------
data_sdf. \
select('id'). \
dropDuplicates(). \
crossJoin(all_dt_sdf). \
join(data_sdf, ['id', 'dt'], 'left'). \
fillna(0, subset=['val']). \
show()
# --- ---------- ---
# | id| dt|val|
# --- ---------- ---
# |201|2022-02-09| 0|
# |201|2022-02-10| 0|
# |201|2022-02-11| 10|
# |201|2022-02-12| 0|
# |201|2022-02-13| 2|
# |201|2022-02-14| 0|
# |201|2022-02-15| 0|
# |202|2022-02-09| 50|
# |202|2022-02-10| 0|
# |202|2022-02-11| 1|
# |202|2022-02-12| 3|
# |202|2022-02-13| 0|
# |202|2022-02-14| 0|
# |202|2022-02-15| 0|
# |401|2022-02-09| 0|
# |401|2022-02-10| 0|
# |401|2022-02-11| 12|
# |401|2022-02-12| 9|
# |401|2022-02-13| 0|
# |401|2022-02-14| 0|
# --- ---------- ---
# only showing top 20 rows
A short approach employing min()
max()
window functions
data_sdf. \
withColumn('data_min_dt', func.min('dt').over(wd.partitionBy(func.lit(1)))). \
withColumn('data_max_dt', func.max('dt').over(wd.partitionBy(func.lit(1)))). \
select('id', 'data_min_dt', 'data_max_dt'). \
dropDuplicates(). \
withColumn('all_dts', func.expr('sequence(data_min_dt, data_max_dt, interval 1 day)')). \
select('id', func.explode('all_dts').alias('dt')). \
join(data_sdf, ['id', 'dt'], 'left'). \
fillna(0, subset=['val']). \
orderBy(['id', 'dt']). \
show()
# --- ---------- ---
# | id| dt|val|
# --- ---------- ---
# |201|2022-02-09| 0|
# |201|2022-02-10| 0|
# |201|2022-02-11| 10|
# |201|2022-02-12| 0|
# |201|2022-02-13| 2|
# |201|2022-02-14| 0|
# |201|2022-02-15| 0|
# |202|2022-02-09| 50|
# |202|2022-02-10| 0|
# |202|2022-02-11| 1|
# |202|2022-02-12| 3|
# |202|2022-02-13| 0|
# |202|2022-02-14| 0|
# |202|2022-02-15| 0|
# |401|2022-02-09| 0|
# |401|2022-02-10| 0|
# |401|2022-02-11| 12|
# |401|2022-02-12| 9|
# |401|2022-02-13| 0|
# |401|2022-02-14| 0|
# --- ---------- ---
# only showing top 20 rows