Suppose I have a Spark Dataframe below:
GroupId | Event_time | Event_name | Event_value |
---|---|---|---|
xx | 2011-08-15 14:47:02.617023 | eventA | 1 |
xx | 2011-08-15 14:48:02.507053 | eventA | 2 |
xx | 2011-08-15 16:47:02.512016 | eventA | 100 |
yy | 2011-08-15 11:47:02.337019 | eventA | 2 |
yy | 2011-08-15 12:47:02.617041 | eventA | 1 |
yy | 2011-08-15 13:47:02.927040 | eventA | 3 |
I would like to get the rolling count of eventA value per hour with a day based on the GroupId
For example, for GroupId
xx, datetime 2011-08-15 14:00, trying to calculate the count of eventA (Event_value ) for that GroupId
from 14:00 till 15:00. In this case, the count should be 1 2 = 3.
The expected output would be something like: (basically display from 00 to 23 within a day; I have ignored some of the hours below for saving space purpose).
If there is no eventA, then we treat the count as NA (treat as 0 for calculating purpose later on) for that hour range.
For event_date 2011-08-15, there is no event until hour 14, then there is no more event after hour 16.
GroupId | Date | Hour | Count | agg_count |
---|---|---|---|---|
xx | 2011-08-15 | 00 | NA | 0 |
xx | 2011-08-15 | 01 | NA | 0 |
xx | 2011-08-15 | 02 | NA | 0 |
xx | 2011-08-15 | 13 | NA | 0 |
xx | 2011-08-15 | 14 | 3 | 3 |
xx | 2011-08-15 | 15 | NA | 3 |
xx | 2011-08-15 | 16 | 100 | 103 |
xx | 2011-08-15 | 17 | NA | 103 |
xx | 2011-08-15 | 23 | NA | 103 |
Below is some of the codes that I have tried:
from pyspark.sql.functions import col, count, hour, sum
df2 = (df
.withColumn("Event_time", col("Event_time").cast("timestamp"))
.withColumn("Date", col("Event_time").cast("date"))
.withColumn("Hour", hour(col("Event_time"))))
df3 = df2.groupBy("GroupId", "Date", "Hour").count()
df3.withColumn(
"agg_count",
sum("Count").over(Window.partitionBy("GroupId", "Date").orderBy("Hour")))
However, the above code cannot display each hour within a day.
CodePudding user response:
You could do it by first creating a table with hours and then joining it with the rest of data.
Setup:
from pyspark.sql import functions as F, Window as W
df = spark.createDataFrame(
[('xx', '2011-08-15 14:47:02.617023', 'eventA', 1),
('xx', '2011-08-15 14:48:02.507053', 'eventA', 2),
('xx', '2011-08-15 16:47:02.512016', 'eventA', 100),
('yy', '2011-08-15 11:47:02.337019', 'eventA', 2),
('yy', '2011-08-15 12:47:02.617041', 'eventA', 1),
('yy', '2011-08-15 13:47:02.927040', 'eventA', 3)],
['GroupId', 'Event_time', 'Event_name', 'Event_value']
)
df = df.withColumn('Date', F.col('Event_time').cast('date'))
The following creates a dataframe with hours:
min_date = df.groupBy().agg(F.min('Date')).head()[0]
max_date = df.groupBy().agg(F.max('Date')).head()[0]
df_hours = df.select(
'GroupId',
'Event_name',
F.explode(F.expr(f"sequence(to_timestamp('{min_date} 00:00:00'), to_timestamp('{max_date} 23:00:00'), interval 1 hour)")).alias('date_hour')
).distinct()
Then, aggregating your first table by hours:
df_agg = (df
.groupBy('GroupId', 'Event_name', F.date_trunc('hour', 'Event_time').alias('date_hour'))
.agg(F.sum('Event_value').alias('Count'))
)
Joining them both together:
df_joined = df_hours.join(df_agg, ['GroupId', 'Event_name', 'date_hour'], 'left')
Adding column agg_count
and others:
w = W.partitionBy('GroupId', 'Event_name').orderBy('date_hour')
df2 = (df_joined
.select(
'GroupId',
'Event_name',
F.to_date('date_hour').alias('Date'),
F.date_format('date_hour', 'HH').alias('Hour'),
'Count',
F.coalesce(F.sum('Count').over(w), F.lit(0)).alias('agg_count')
)
)
Result:
------- ---------- ---------- ---- ----- ---------
|GroupId|Event_name| Date|Hour|Count|agg_count|
------- ---------- ---------- ---- ----- ---------
| xx| eventA|2011-08-15| 00| null| 0|
| xx| eventA|2011-08-15| 01| null| 0|
| xx| eventA|2011-08-15| 02| null| 0|
| xx| eventA|2011-08-15| 03| null| 0|
| xx| eventA|2011-08-15| 04| null| 0|
| xx| eventA|2011-08-15| 05| null| 0|
| xx| eventA|2011-08-15| 06| null| 0|
| xx| eventA|2011-08-15| 07| null| 0|
| xx| eventA|2011-08-15| 08| null| 0|
| xx| eventA|2011-08-15| 09| null| 0|
| xx| eventA|2011-08-15| 10| null| 0|
| xx| eventA|2011-08-15| 11| null| 0|
| xx| eventA|2011-08-15| 12| null| 0|
| xx| eventA|2011-08-15| 13| null| 0|
| xx| eventA|2011-08-15| 14| 3| 3|
| xx| eventA|2011-08-15| 15| null| 3|
| xx| eventA|2011-08-15| 16| 100| 103|
| xx| eventA|2011-08-15| 17| null| 103|
| xx| eventA|2011-08-15| 18| null| 103|
| xx| eventA|2011-08-15| 19| null| 103|
| xx| eventA|2011-08-15| 20| null| 103|
| xx| eventA|2011-08-15| 21| null| 103|
| xx| eventA|2011-08-15| 22| null| 103|
| xx| eventA|2011-08-15| 23| null| 103|
| yy| eventA|2011-08-15| 00| null| 0|
| yy| eventA|2011-08-15| 01| null| 0|
| yy| eventA|2011-08-15| 02| null| 0|
| yy| eventA|2011-08-15| 03| null| 0|
| yy| eventA|2011-08-15| 04| null| 0|
| yy| eventA|2011-08-15| 05| null| 0|
| yy| eventA|2011-08-15| 06| null| 0|
| yy| eventA|2011-08-15| 07| null| 0|
| yy| eventA|2011-08-15| 08| null| 0|
| yy| eventA|2011-08-15| 09| null| 0|
| yy| eventA|2011-08-15| 10| null| 0|
| yy| eventA|2011-08-15| 11| 2| 2|
| yy| eventA|2011-08-15| 12| 1| 3|
| yy| eventA|2011-08-15| 13| 3| 6|
| yy| eventA|2011-08-15| 14| null| 6|
| yy| eventA|2011-08-15| 15| null| 6|
| yy| eventA|2011-08-15| 16| null| 6|
| yy| eventA|2011-08-15| 17| null| 6|
| yy| eventA|2011-08-15| 18| null| 6|
| yy| eventA|2011-08-15| 19| null| 6|
| yy| eventA|2011-08-15| 20| null| 6|
| yy| eventA|2011-08-15| 21| null| 6|
| yy| eventA|2011-08-15| 22| null| 6|
| yy| eventA|2011-08-15| 23| null| 6|
------- ---------- ---------- ---- ----- ---------