I have this data frame with a lot of missing dates in between
df = pd.DataFrame({'date':['2021-12-1','2021-12-2','2021-12-21','2021-12-1','2021-12-7','2021-12-1','2021-12-5','2021-12-1','2021-12-5'],
'id1':['a1','a1','a1','a1','a1','a2','a2','a2','a2'],
'id2':['b1','b1','b1','b2','b2','b3','b3','b4','b4'],
'value1':[1,5,7,2,9,3,0,1,7],
'value2':[6,2,8,1,9,3,0,2,6]})
Which looks like this
date id1 id2 value1 value2
0 2021-12-1 a1 b1 1 6
1 2021-12-2 a1 b1 5 2
2 2021-12-21 a1 b1 7 8
3 2021-12-1 a1 b2 2 1
4 2021-12-7 a1 b2 9 9
5 2021-12-1 a2 b3 3 3
6 2021-12-5 a2 b3 0 0
7 2021-12-1 a2 b4 1 2
8 2021-12-5 a2 b4 7 6
I want my output to look like this where the frequency is changed from daily to weekly and the week starts from Monday.
id1 id2 date value1 value2
0 a1 b1 2021-12-06 6 8
1 a1 b1 2021-12-13 0 0
2 a1 b1 2021-12-20 0 0
3 a1 b1 2021-12-27 7 8
4 a1 b2 2021-12-06 2 1
5 a1 b2 2021-12-13 9 9
6 a2 b3 2021-12-06 3 3
7 a2 b4 2021-12-06 8 8
I have done my coding in pandas
Firstly I am filling missing dates with zero values and then in the second step converting daily data to weekly data using resample
. Here I am using W-Mon
which means I starting my week from Monday.
#Filling missing dates values with zero
df['date'] = pd.to_datetime(df['date'])
df = (df.set_index('date')
.groupby(['id1','id2'])['value1','value2']
.apply(lambda x: x.asfreq('d', fill_value=0))
.reset_index()
[['date','id1','id2','value1','value2']])
#convert to weekly data and set monday as starting day for each week
df = (df.groupby(['id1','id2'])
.resample('W-Mon', label='right', closed = 'left', on='date')
.agg({'value1':'sum',"value2":'sum'} )
.reset_index())
I am trying to convert my code to spark I have gone through this is there a simpler way?
CodePudding user response:
Try this.
Create a tmp
dataframe having Sequence of dates starting from the next monday in interval of 7 days from the min of date column. Then join it with the main dataframe and then manipulate based on difference between the week number:
from pyspark.sql import functions as F
df = df.withColumn("date",F.to_date("date"))
tmp = (df.groupBy("id1","id2").agg(F.min("date").alias("Mindate")
,F.max("date").alias("Maxdate"))
.withColumn("MinMonday",F.next_day("Mindate","Mon"))
.withColumn("MaxMonday",F.next_day("Maxdate","Mon"))
.withColumn("Seq",
F.explode(F.expr("sequence(MinMonday,MaxMonday,interval 7 day)")))
.drop("Mindate","Maxdate","MinMonday","MaxMonday"))
def maskedvalue(col) : return f"""CASE WHEN weekdiff <=1 THEN {col} ELSE 0 END"""
out = (df.alias("left").join(tmp.alias("right"),
on=[df['id1']==tmp['id1'],df['id2']==tmp['id2'],df['date']<=tmp['Seq']])
.select("date","left.id1","left.id2","Seq","value1","value2")
.withColumn("weekdiff",F.weekofyear("Seq")-F.weekofyear("date"))
.withColumn("value1",F.expr(maskedvalue(("value1"))))
.withColumn("value2",F.expr(maskedvalue(("value2"))))
.groupBy("id1","id2","Seq").agg(F.sum("value1").alias("value1")
,F.sum("value2").alias("value2"))
.withColumnRenamed("Seq","Date")
)
out.orderBy("id1","id2","Date").show()
--- --- ---------- ------ ------
|id1|id2| Date|value1|value2|
--- --- ---------- ------ ------
| a1| b1|2021-12-06| 6| 8|
| a1| b1|2021-12-13| 0| 0|
| a1| b1|2021-12-20| 0| 0|
| a1| b1|2021-12-27| 7| 8|
| a1| b2|2021-12-06| 2| 1|
| a1| b2|2021-12-13| 9| 9|
| a2| b3|2021-12-06| 3| 3|
| a2| b4|2021-12-06| 8| 8|
--- --- ---------- ------ ------
Note that the tmp dataframe looks like below:
--- --- ----------
|id1|id2| Seq|
--- --- ----------
| a1| b1|2021-12-06|
| a1| b1|2021-12-13|
| a1| b1|2021-12-20|
| a1| b1|2021-12-27|
| a1| b2|2021-12-06|
| a1| b2|2021-12-13|
| a2| b3|2021-12-06|
| a2| b4|2021-12-06|
--- --- ----------
CodePudding user response:
This will do it, the code is pretty straight forward but if in doubt check the function in the spark docs
df = df.withColumn('Date', F.next_day('Date','Mon'))
df = df.groupby((['id1','id2','Date'])).agg(*[F.sum('value1').alias(c) for c in ['value1', 'value2']])
new_dts = df.groupby(['id1','id2']).agg(
F.array_except(
F.expr('sequence(min(Date), max(Date), interval 1 week)'),
F.collect_set('Date'),
).name('Date')
)
new_dts = new_dts.withColumn('Date', F.explode('Date'))
df = df.unionByName(new_dts, allowMissingColumns=True).na.fill(0)
--- --- ---------- ------ ------
|id1|id2| Date|value1|value2|
--- --- ---------- ------ ------
| a1| b2|2021-12-06| 2| 2|
| a1| b1|2021-12-27| 7| 7|
| a1| b1|2021-12-06| 6| 6|
| a2| b4|2021-12-06| 8| 8|
| a2| b3|2021-12-06| 3| 3|
| a1| b2|2021-12-13| 9| 9|
| a1| b1|2021-12-13| 0| 0|
| a1| b1|2021-12-20| 0| 0|
--- --- ---------- ------ ------
You may want to consider that you're currently aligning dates to the Mon of the following week. To align your dates to the Mon of the same week instead do
F.date_sub(F.next_day('Date','Mon'), 7)