Home > Mobile >  Fill missing dates in two groups and convert data to weekly in Spark dataFrame
Fill missing dates in two groups and convert data to weekly in Spark dataFrame

Time:12-20

I have this data frame with a lot of missing dates in between

df = pd.DataFrame({'date':['2021-12-1','2021-12-2','2021-12-21','2021-12-1','2021-12-7','2021-12-1','2021-12-5','2021-12-1','2021-12-5'],
                   'id1':['a1','a1','a1','a1','a1','a2','a2','a2','a2'],
                   'id2':['b1','b1','b1','b2','b2','b3','b3','b4','b4'],
                   'value1':[1,5,7,2,9,3,0,1,7],
                   'value2':[6,2,8,1,9,3,0,2,6]})

Which looks like this

         date id1 id2  value1  value2
0   2021-12-1  a1  b1       1       6
1   2021-12-2  a1  b1       5       2
2  2021-12-21  a1  b1       7       8
3   2021-12-1  a1  b2       2       1
4   2021-12-7  a1  b2       9       9
5   2021-12-1  a2  b3       3       3
6   2021-12-5  a2  b3       0       0
7   2021-12-1  a2  b4       1       2
8   2021-12-5  a2  b4       7       6

I want my output to look like this where the frequency is changed from daily to weekly and the week starts from Monday.

  id1 id2       date  value1  value2
0  a1  b1 2021-12-06       6       8
1  a1  b1 2021-12-13       0       0
2  a1  b1 2021-12-20       0       0
3  a1  b1 2021-12-27       7       8
4  a1  b2 2021-12-06       2       1
5  a1  b2 2021-12-13       9       9
6  a2  b3 2021-12-06       3       3
7  a2  b4 2021-12-06       8       8

I have done my coding in pandas Firstly I am filling missing dates with zero values and then in the second step converting daily data to weekly data using resample. Here I am using W-Mon which means I starting my week from Monday.

#Filling missing dates values with zero
df['date'] = pd.to_datetime(df['date'])
df = (df.set_index('date')
      .groupby(['id1','id2'])['value1','value2']
      .apply(lambda x: x.asfreq('d', fill_value=0))
      .reset_index()
      [['date','id1','id2','value1','value2']])
#convert to weekly data and set monday as starting day for each week
df = (df.groupby(['id1','id2'])
       .resample('W-Mon', label='right', closed = 'left', on='date')
       .agg({'value1':'sum',"value2":'sum'} )
       .reset_index())

I am trying to convert my code to spark I have gone through this is there a simpler way?

CodePudding user response:

Try this.

Create a tmp dataframe having Sequence of dates starting from the next monday in interval of 7 days from the min of date column. Then join it with the main dataframe and then manipulate based on difference between the week number:

from pyspark.sql import functions as F

df = df.withColumn("date",F.to_date("date"))
tmp = (df.groupBy("id1","id2").agg(F.min("date").alias("Mindate")
                                   ,F.max("date").alias("Maxdate"))
         .withColumn("MinMonday",F.next_day("Mindate","Mon"))
         .withColumn("MaxMonday",F.next_day("Maxdate","Mon"))
         .withColumn("Seq",
          F.explode(F.expr("sequence(MinMonday,MaxMonday,interval 7 day)")))
         .drop("Mindate","Maxdate","MinMonday","MaxMonday"))


def maskedvalue(col) : return f"""CASE WHEN weekdiff <=1 THEN {col} ELSE 0 END"""
out = (df.alias("left").join(tmp.alias("right"),
             on=[df['id1']==tmp['id1'],df['id2']==tmp['id2'],df['date']<=tmp['Seq']])
.select("date","left.id1","left.id2","Seq","value1","value2")
.withColumn("weekdiff",F.weekofyear("Seq")-F.weekofyear("date"))
.withColumn("value1",F.expr(maskedvalue(("value1"))))
.withColumn("value2",F.expr(maskedvalue(("value2"))))
.groupBy("id1","id2","Seq").agg(F.sum("value1").alias("value1")
                                ,F.sum("value2").alias("value2"))
.withColumnRenamed("Seq","Date")
)

out.orderBy("id1","id2","Date").show()


 --- --- ---------- ------ ------ 
|id1|id2|      Date|value1|value2|
 --- --- ---------- ------ ------ 
| a1| b1|2021-12-06|     6|     8|
| a1| b1|2021-12-13|     0|     0|
| a1| b1|2021-12-20|     0|     0|
| a1| b1|2021-12-27|     7|     8|
| a1| b2|2021-12-06|     2|     1|
| a1| b2|2021-12-13|     9|     9|
| a2| b3|2021-12-06|     3|     3|
| a2| b4|2021-12-06|     8|     8|
 --- --- ---------- ------ ------ 

Note that the tmp dataframe looks like below:

 --- --- ---------- 
|id1|id2|       Seq|
 --- --- ---------- 
| a1| b1|2021-12-06|
| a1| b1|2021-12-13|
| a1| b1|2021-12-20|
| a1| b1|2021-12-27|
| a1| b2|2021-12-06|
| a1| b2|2021-12-13|
| a2| b3|2021-12-06|
| a2| b4|2021-12-06|
 --- --- ---------- 

CodePudding user response:

This will do it, the code is pretty straight forward but if in doubt check the function in the spark docs

df = df.withColumn('Date', F.next_day('Date','Mon'))

df = df.groupby((['id1','id2','Date'])).agg(*[F.sum('value1').alias(c) for c in ['value1', 'value2']])

new_dts = df.groupby(['id1','id2']).agg(
    F.array_except(
        F.expr('sequence(min(Date), max(Date), interval 1 week)'),
        F.collect_set('Date'),
    ).name('Date')
)

new_dts = new_dts.withColumn('Date', F.explode('Date'))

df = df.unionByName(new_dts, allowMissingColumns=True).na.fill(0)
 --- --- ---------- ------ ------ 
|id1|id2|      Date|value1|value2|
 --- --- ---------- ------ ------ 
| a1| b2|2021-12-06|     2|     2|
| a1| b1|2021-12-27|     7|     7|
| a1| b1|2021-12-06|     6|     6|
| a2| b4|2021-12-06|     8|     8|
| a2| b3|2021-12-06|     3|     3|
| a1| b2|2021-12-13|     9|     9|
| a1| b1|2021-12-13|     0|     0|
| a1| b1|2021-12-20|     0|     0|
 --- --- ---------- ------ ------ 

You may want to consider that you're currently aligning dates to the Mon of the following week. To align your dates to the Mon of the same week instead do

F.date_sub(F.next_day('Date','Mon'), 7)
  • Related