Home > other >  How to get aggregate by hour including missing hours and add cumulative sum?
How to get aggregate by hour including missing hours and add cumulative sum?

Time:06-07

Suppose I have a Spark Dataframe below:

GroupId Event_time Event_name Event_value
xx 2011-08-15 14:47:02.617023 eventA 1
xx 2011-08-15 14:48:02.507053 eventA 2
xx 2011-08-15 16:47:02.512016 eventA 100
yy 2011-08-15 11:47:02.337019 eventA 2
yy 2011-08-15 12:47:02.617041 eventA 1
yy 2011-08-15 13:47:02.927040 eventA 3

I would like to get the rolling count of eventA value per hour with a day based on the GroupId

For example, for GroupId xx, datetime 2011-08-15 14:00, trying to calculate the count of eventA (Event_value ) for that GroupId from 14:00 till 15:00. In this case, the count should be 1 2 = 3.

The expected output would be something like: (basically display from 00 to 23 within a day; I have ignored some of the hours below for saving space purpose).

If there is no eventA, then we treat the count as NA (treat as 0 for calculating purpose later on) for that hour range.

For event_date 2011-08-15, there is no event until hour 14, then there is no more event after hour 16.

GroupId Date Hour Count agg_count
xx 2011-08-15 00 NA 0
xx 2011-08-15 01 NA 0
xx 2011-08-15 02 NA 0
xx 2011-08-15 13 NA 0
xx 2011-08-15 14 3 3
xx 2011-08-15 15 NA 3
xx 2011-08-15 16 100 103
xx 2011-08-15 17 NA 103
xx 2011-08-15 23 NA 103

Below is some of the codes that I have tried:

from pyspark.sql.functions import col, count, hour, sum
    
df2 = (df
  .withColumn("Event_time", col("Event_time").cast("timestamp"))
  .withColumn("Date", col("Event_time").cast("date"))
  .withColumn("Hour", hour(col("Event_time"))))

df3 = df2.groupBy("GroupId", "Date", "Hour").count()

df3.withColumn(
  "agg_count", 
  sum("Count").over(Window.partitionBy("GroupId", "Date").orderBy("Hour")))

However, the above code cannot display each hour within a day.

CodePudding user response:

You could do it by first creating a table with hours and then joining it with the rest of data.

Setup:

from pyspark.sql import functions as F, Window as W
df = spark.createDataFrame(
    [('xx', '2011-08-15 14:47:02.617023', 'eventA', 1),
     ('xx', '2011-08-15 14:48:02.507053', 'eventA', 2),
     ('xx', '2011-08-15 16:47:02.512016', 'eventA', 100),
     ('yy', '2011-08-15 11:47:02.337019', 'eventA', 2),
     ('yy', '2011-08-15 12:47:02.617041', 'eventA', 1),
     ('yy', '2011-08-15 13:47:02.927040', 'eventA', 3)],
    ['GroupId', 'Event_time', 'Event_name', 'Event_value']
)
df = df.withColumn('Date', F.col('Event_time').cast('date'))

The following creates a dataframe with hours:

min_date = df.groupBy().agg(F.min('Date')).head()[0]
max_date = df.groupBy().agg(F.max('Date')).head()[0]
df_hours = df.select(
    'GroupId',
    'Event_name',
    F.explode(F.expr(f"sequence(to_timestamp('{min_date} 00:00:00'), to_timestamp('{max_date} 23:00:00'), interval 1 hour)")).alias('date_hour')
).distinct()

Then, aggregating your first table by hours:

df_agg = (df
    .groupBy('GroupId', 'Event_name', F.date_trunc('hour', 'Event_time').alias('date_hour'))
    .agg(F.sum('Event_value').alias('Count'))
)

Joining them both together:

df_joined = df_hours.join(df_agg, ['GroupId', 'Event_name', 'date_hour'], 'left')

Adding column agg_count and others:

w = W.partitionBy('GroupId', 'Event_name').orderBy('date_hour')
df2 = (df_joined
    .select(
        'GroupId',
        'Event_name',
        F.to_date('date_hour').alias('Date'),
        F.date_format('date_hour', 'HH').alias('Hour'),
        'Count',
        F.coalesce(F.sum('Count').over(w), F.lit(0)).alias('agg_count')
    )
)

Result:

 ------- ---------- ---------- ---- ----- --------- 
|GroupId|Event_name|      Date|Hour|Count|agg_count|
 ------- ---------- ---------- ---- ----- --------- 
|     xx|    eventA|2011-08-15|  00| null|        0|
|     xx|    eventA|2011-08-15|  01| null|        0|
|     xx|    eventA|2011-08-15|  02| null|        0|
|     xx|    eventA|2011-08-15|  03| null|        0|
|     xx|    eventA|2011-08-15|  04| null|        0|
|     xx|    eventA|2011-08-15|  05| null|        0|
|     xx|    eventA|2011-08-15|  06| null|        0|
|     xx|    eventA|2011-08-15|  07| null|        0|
|     xx|    eventA|2011-08-15|  08| null|        0|
|     xx|    eventA|2011-08-15|  09| null|        0|
|     xx|    eventA|2011-08-15|  10| null|        0|
|     xx|    eventA|2011-08-15|  11| null|        0|
|     xx|    eventA|2011-08-15|  12| null|        0|
|     xx|    eventA|2011-08-15|  13| null|        0|
|     xx|    eventA|2011-08-15|  14|    3|        3|
|     xx|    eventA|2011-08-15|  15| null|        3|
|     xx|    eventA|2011-08-15|  16|  100|      103|
|     xx|    eventA|2011-08-15|  17| null|      103|
|     xx|    eventA|2011-08-15|  18| null|      103|
|     xx|    eventA|2011-08-15|  19| null|      103|
|     xx|    eventA|2011-08-15|  20| null|      103|
|     xx|    eventA|2011-08-15|  21| null|      103|
|     xx|    eventA|2011-08-15|  22| null|      103|
|     xx|    eventA|2011-08-15|  23| null|      103|
|     yy|    eventA|2011-08-15|  00| null|        0|
|     yy|    eventA|2011-08-15|  01| null|        0|
|     yy|    eventA|2011-08-15|  02| null|        0|
|     yy|    eventA|2011-08-15|  03| null|        0|
|     yy|    eventA|2011-08-15|  04| null|        0|
|     yy|    eventA|2011-08-15|  05| null|        0|
|     yy|    eventA|2011-08-15|  06| null|        0|
|     yy|    eventA|2011-08-15|  07| null|        0|
|     yy|    eventA|2011-08-15|  08| null|        0|
|     yy|    eventA|2011-08-15|  09| null|        0|
|     yy|    eventA|2011-08-15|  10| null|        0|
|     yy|    eventA|2011-08-15|  11|    2|        2|
|     yy|    eventA|2011-08-15|  12|    1|        3|
|     yy|    eventA|2011-08-15|  13|    3|        6|
|     yy|    eventA|2011-08-15|  14| null|        6|
|     yy|    eventA|2011-08-15|  15| null|        6|
|     yy|    eventA|2011-08-15|  16| null|        6|
|     yy|    eventA|2011-08-15|  17| null|        6|
|     yy|    eventA|2011-08-15|  18| null|        6|
|     yy|    eventA|2011-08-15|  19| null|        6|
|     yy|    eventA|2011-08-15|  20| null|        6|
|     yy|    eventA|2011-08-15|  21| null|        6|
|     yy|    eventA|2011-08-15|  22| null|        6|
|     yy|    eventA|2011-08-15|  23| null|        6|
 ------- ---------- ---------- ---- ----- --------- 
  • Related