I would like to be able to count the number of activities by a user over certain time of window (for example every 12 hours) for multiple users. For example, I have a table with dataset as follow:
Timestamp | User |
---|---|
08/11/2021 04:05:06 | A |
08/11/2021 04:15:06 | B |
08/11/2021 09:15:26 | A |
08/11/2021 11:04:06 | B |
08/11/2021 14:55:16 | A |
09/11/2021 04:12:11 | B |
and so on. I would like to be able to derive the data to be like this:
Timestamp | User | Count |
---|---|---|
08/11/2021 00:00:00 - 11:59:59 | A | 2 |
08/11/2021 00:00:00 - 11:59:59 | B | 2 |
08/11/2021 12:00:00 - 23:59:59 | A | 1 |
08/11/2021 12:00:00 - 23:59:59 | B | 0 |
09/11/2021 00:00:00 - 11:59:59 | A | 0 |
09/11/2021 00:00:00 - 11:59:59 | B | 1 |
Is there a way to do this? I am still new in Scala and Spark so not too sure on how to do that. Thanks!
CodePudding user response:
By converting your timestamp value to the respective 00:00:00 - 11:59:59
or 12:00:00 - 23:59:59
time period, you may easily perform a group by on the resulting data. Using the HOUR
method to retrieve the hour of day, you may use this to determine which period to assign each row in. Moreover combining this with CONCAT
to create the new time period and to_date
to convert your timestamp to a date you may create this time period value.
You may try the following approaches:
Using scala api
outputDf =
df.withColumn(
"TimePeriod",
concat(
to_date("Timestamp"),
when(
hour("Timestamp")<12," 00:00:00 - 11:59:59"
).otherwise(" 12:00:00 - 23:59:59")
)
)
.groupBy(
"TimePeriod",
"User"
)
.count()
.withColumnRenamed("TimePeriod","Timestamp")
outputDf.show(truncate=false)
Using pyspark api
from pyspark.sql import functions as F
output_df = (
df.withColumn(
"TimePeriod",
F.concat(
F.to_date("Timestamp"),
F.when(
F.hour("Timestamp")<12," 00:00:00 - 11:59:59"
).otherwise(" 12:00:00 - 23:59:59")
)
)
.groupBy(
"TimePeriod",
"User"
)
.count()
.withColumnRenamed("TimePeriod","Timestamp")
)
output_df.show(truncate=False)
Using spark sql
- Ensure your dataframe is available
df.createOrReplaceTempView("input_df2")
- Run the following sql on the spark session
outputDf = sparkSession.sql("<insert sql below here>")
SQL:
SELECT
CONCAT(
to_date(Timestamp),
CASE
WHEN HOUR(Timestamp) < 12 THEN ' 00:00:00 - 11:59:59'
ELSE ' 12:00:00 - 23:59:59'
END
) as Timestamp,
User,
COUNT(1) as COUNT
FROM
input_df2
GROUP BY 1,2
Output
------------------------------ ---- -----
|Timestamp |User|COUNT|
------------------------------ ---- -----
|2021-11-08 00:00:00 - 11:59:59|B |2 |
|2021-11-08 00:00:00 - 11:59:59|A |2 |
|2021-11-09 00:00:00 - 11:59:59|B |1 |
|2021-11-08 12:00:00 - 23:59:59|A |1 |
------------------------------ ---- -----
NB: Your timestamp column should be of the timestamp data type which can be verified using df.printSchema()
. If your timestamp column is a string type you may use to_timestamp
to convert it to a timestamp type eg to_timestamp(Timestamp,"dd/MM/yyyy H:m:s")
Edit 1 : Question Changed By Op
The following update will provide the count of users on all possible time periods. A cross-join of all possible user-time-periods with the count of actual users for each time period was used below.
Using scala api
timeperiods =
df.select(to_date("Timestamp").alias("TimestampDate"))
.distinct()
.withColumn("TimePeriodLower",concat("TimestampDate",lit(" 00:00:00 - 11:59:59")))
.withColumn("TimePeriodUpper",concat("TimestampDate",lit(" 12:00:00 - 23:59:59")));
timeperiods =
timeperiods.select("TimePeriodLower")
.union(
timeperiods.select("TimePeriodUpper")
)
.withColumnRenamed("TimePeriodLower","TimePeriod")
.crossJoin(
df.select("User").distinct()
);
outputDf =
df.withColumn(
"TimePeriod",
concat(
to_date("Timestamp"),
when(
hour("Timestamp")<12," 00:00:00 - 11:59:59"
).otherwise(" 12:00:00 - 23:59:59")
)
)
.groupBy(
"TimePeriod",
"User"
)
.count()
.join(
timeperiods,
Seq("TimePeriod","User"),
"right"
)
.withColumn("count",coalesce("count",lit(0)))
.drop("Timestamp")
.withColumnRenamed("TimePeriod","Timestamp");
Using pyspark api
from pyspark.sql import functions as F
timeperiods = (
df.select(F.to_date("Timestamp").alias("TimestampDate"))
.distinct()
.withColumn("TimePeriodLower",F.concat("TimestampDate",F.lit(" 00:00:00 - 11:59:59")))
.withColumn("TimePeriodUpper",F.concat("TimestampDate",F.lit(" 12:00:00 - 23:59:59")))
)
timeperiods = (
timeperiods.select("TimePeriodLower")
.union(
timeperiods.select("TimePeriodUpper")
)
.withColumnRenamed("TimePeriodLower","TimePeriod")
.crossJoin(
df.select("User").distinct()
)
)
outputDf = (
df.withColumn(
"TimePeriod",
F.concat(
F.to_date("Timestamp"),
F.when(
F.hour("Timestamp")<12," 00:00:00 - 11:59:59"
).otherwise(" 12:00:00 - 23:59:59")
)
)
.groupBy(
"TimePeriod",
"User"
)
.count()
.join(
timeperiods,
["TimePeriod","User"],
"right"
)
.withColumn("count",F.coalesce("count",F.lit(0)))
.drop("Timestamp")
.withColumnRenamed("TimePeriod","Timestamp")
)
Using spark sql
- Ensure your dataframe is available
df.createOrReplaceTempView("input_df2")
- Run the following sql on the spark session
outputDf = sparkSession.sql("<insert sql below here>")
SQL:
WITH unique_users AS (
SELECT DISTINCT
User
FROM
input_df2
),
unique_dates AS (
SELECT DISTINCT
to_date(Timestamp) as TimestampDate
FROM
input_df2
),
timeperiods as (
SELECT CONCAT(TimestampDate,' 00:00:00 - 11:59:59') as TimePeriod FROM unique_dates
UNION ALL
SELECT CONCAT(TimestampDate,' 12:00:00 - 23:59:59') as TimePeriod FROM unique_dates
),
timeperiod_users as (
SELECT * FROM timeperiods CROSS JOIN unique_users
),
timeperiod_user_counts as (
SELECT
CONCAT(
to_date(Timestamp),
CASE
WHEN HOUR(Timestamp) < 12 THEN ' 00:00:00 - 11:59:59'
ELSE ' 12:00:00 - 23:59:59'
END
) as Timeperiod,
User,
COUNT(1) as COUNT
FROM
input_df2
GROUP BY 1,2
)
SELECT
tu.TimePeriod as Timestamp,
tu.User,
COALESCE(tuc.COUNT,0) as COUNT
FROM
timeperiod_user_counts tuc
RIGHT JOIN
timeperiod_users tu ON tu.TimePeriod = tuc.TimePeriod AND
tu.User = tuc.User
Outputs
outputDf.orderBy("Timestamp","User").show()
------------------------------ ---- -----
|Timestamp |User|count|
------------------------------ ---- -----
|2021-11-08 00:00:00 - 11:59:59|A |2 |
|2021-11-08 00:00:00 - 11:59:59|B |2 |
|2021-11-08 12:00:00 - 23:59:59|A |1 |
|2021-11-08 12:00:00 - 23:59:59|B |0 |
|2021-11-09 00:00:00 - 11:59:59|A |0 |
|2021-11-09 00:00:00 - 11:59:59|B |1 |
|2021-11-09 12:00:00 - 23:59:59|A |0 |
|2021-11-09 12:00:00 - 23:59:59|B |0 |
------------------------------ ---- -----
timeperiods.show()
------------------------------ ----
|TimePeriod |User|
------------------------------ ----
|2021-11-08 00:00:00 - 11:59:59|B |
|2021-11-08 00:00:00 - 11:59:59|A |
|2021-11-09 00:00:00 - 11:59:59|B |
|2021-11-09 00:00:00 - 11:59:59|A |
|2021-11-08 12:00:00 - 23:59:59|B |
|2021-11-08 12:00:00 - 23:59:59|A |
|2021-11-09 12:00:00 - 23:59:59|B |
|2021-11-09 12:00:00 - 23:59:59|A |
------------------------------ ----
CodePudding user response:
PySpark:
This time resolution count can be achieved by using hour
from Timestamp
and groupBy
on date
& User
within the Spark dataframe as follows:
#Aggeragate records numbers for specific features (User) for certain time-resolution PerDay(24hrs), HalfDay(2x12hrs)
df = new_df.groupBy("User", "date").agg(
F.sum(F.hour("Timestamp").between(0, 24).cast("int")).alias("NoLogPerDay"),
F.sum(F.hour("Timestamp").between(0, 11).cast("int")).alias("NoLogPer-1st-12-hrs"),
F.sum(F.hour("Timestamp").between(12, 23).cast("int")).alias("NoLogPer-2nd-12-hrs"),
).sort('date')
df.show(truncate = False)
Please see the results as your expected frame:
|timestamp |UserName|
------------------- --------
|2021-08-11 04:05:06|A |
|2021-08-11 04:15:06|B |
|2021-08-11 09:15:26|A |
|2021-08-11 11:04:06|B |
|2021-08-11 14:55:16|A |
|2021-09-11 04:12:11|B |
------------------- --------
------------------- ---------- --------
|timestamp |date |UserName|
------------------- ---------- --------
|2021-08-11 04:05:06|2021-08-11|A |
|2021-08-11 04:15:06|2021-08-11|B |
|2021-08-11 09:15:26|2021-08-11|A |
|2021-08-11 11:04:06|2021-08-11|B |
|2021-08-11 14:55:16|2021-08-11|A |
|2021-09-11 04:12:11|2021-09-11|B |
------------------- ---------- --------
-------- ---------- ----------- ------------------- -------------------
|UserName|date |NoLogPerDay|NoLogPer-1st-12-hrs|NoLogPer-2nd-12-hrs|
-------- ---------- ----------- ------------------- -------------------
|A |2021-08-11|3 |2 |1 |
|B |2021-08-11|2 |2 |0 |
|B |2021-09-11|1 |1 |0 |
-------- ---------- ----------- ------------------- -------------------