Home > Mobile >  Group Count based on Specific Time Window for Spark Scala
Group Count based on Specific Time Window for Spark Scala

Time:11-10

I would like to be able to count the number of activities by a user over certain time of window (for example every 12 hours) for multiple users. For example, I have a table with dataset as follow:

Timestamp User
08/11/2021 04:05:06 A
08/11/2021 04:15:06 B
08/11/2021 09:15:26 A
08/11/2021 11:04:06 B
08/11/2021 14:55:16 A
09/11/2021 04:12:11 B

and so on. I would like to be able to derive the data to be like this:

Timestamp User Count
08/11/2021 00:00:00 - 11:59:59 A 2
08/11/2021 00:00:00 - 11:59:59 B 2
08/11/2021 12:00:00 - 23:59:59 A 1
08/11/2021 12:00:00 - 23:59:59 B 0
09/11/2021 00:00:00 - 11:59:59 A 0
09/11/2021 00:00:00 - 11:59:59 B 1

Is there a way to do this? I am still new in Scala and Spark so not too sure on how to do that. Thanks!

CodePudding user response:

By converting your timestamp value to the respective 00:00:00 - 11:59:59 or 12:00:00 - 23:59:59 time period, you may easily perform a group by on the resulting data. Using the HOUR method to retrieve the hour of day, you may use this to determine which period to assign each row in. Moreover combining this with CONCAT to create the new time period and to_date to convert your timestamp to a date you may create this time period value.

You may try the following approaches:

Using scala api

outputDf = 
    df.withColumn(
        "TimePeriod",
        concat(
            to_date("Timestamp"),
            when(
                hour("Timestamp")<12," 00:00:00 - 11:59:59"
            ).otherwise(" 12:00:00 - 23:59:59")
        )
    )
    .groupBy(
        "TimePeriod",
        "User"
    )
    .count()
    .withColumnRenamed("TimePeriod","Timestamp")

outputDf.show(truncate=false)

Using pyspark api

from pyspark.sql import functions as F

output_df = (
    df.withColumn(
        "TimePeriod",
        F.concat(
            F.to_date("Timestamp"),
            F.when(
                F.hour("Timestamp")<12," 00:00:00 - 11:59:59"
            ).otherwise(" 12:00:00 - 23:59:59")
        )
    )
    .groupBy(
        "TimePeriod",
        "User"
    )
    .count()
    .withColumnRenamed("TimePeriod","Timestamp")
)
output_df.show(truncate=False)

Using spark sql

  1. Ensure your dataframe is available
df.createOrReplaceTempView("input_df2")
  1. Run the following sql on the spark session
outputDf = sparkSession.sql("<insert sql below here>")

SQL:

    SELECT
        CONCAT(
            to_date(Timestamp),
            CASE
                WHEN HOUR(Timestamp) < 12 THEN ' 00:00:00 - 11:59:59'
                ELSE ' 12:00:00 - 23:59:59'
            END
        ) as Timestamp,
        User,
        COUNT(1) as COUNT
    FROM
        input_df2
    GROUP BY 1,2

Output

 ------------------------------ ---- ----- 
|Timestamp                     |User|COUNT|
 ------------------------------ ---- ----- 
|2021-11-08 00:00:00 - 11:59:59|B   |2    |
|2021-11-08 00:00:00 - 11:59:59|A   |2    |
|2021-11-09 00:00:00 - 11:59:59|B   |1    |
|2021-11-08 12:00:00 - 23:59:59|A   |1    |
 ------------------------------ ---- ----- 

NB: Your timestamp column should be of the timestamp data type which can be verified using df.printSchema(). If your timestamp column is a string type you may use to_timestamp to convert it to a timestamp type eg to_timestamp(Timestamp,"dd/MM/yyyy H:m:s")

Edit 1 : Question Changed By Op

The following update will provide the count of users on all possible time periods. A cross-join of all possible user-time-periods with the count of actual users for each time period was used below.

Using scala api

timeperiods = 
    df.select(to_date("Timestamp").alias("TimestampDate"))
      .distinct()
      .withColumn("TimePeriodLower",concat("TimestampDate",lit(" 00:00:00 - 11:59:59")))
      .withColumn("TimePeriodUpper",concat("TimestampDate",lit(" 12:00:00 - 23:59:59")));


timeperiods = 
    timeperiods.select("TimePeriodLower")
               .union(
                   timeperiods.select("TimePeriodUpper")
               )
               .withColumnRenamed("TimePeriodLower","TimePeriod")
               .crossJoin(
                   df.select("User").distinct()
               );


outputDf = 
    df.withColumn(
        "TimePeriod",
        concat(
            to_date("Timestamp"),
            when(
                hour("Timestamp")<12," 00:00:00 - 11:59:59"
            ).otherwise(" 12:00:00 - 23:59:59")
        )
    )
    .groupBy(
        "TimePeriod",
        "User"
    )
    .count()
    .join(
        timeperiods,
        Seq("TimePeriod","User"),
        "right"
    )
    .withColumn("count",coalesce("count",lit(0)))
    .drop("Timestamp")
    .withColumnRenamed("TimePeriod","Timestamp");
    

Using pyspark api

from pyspark.sql import functions as F

timeperiods = (
    df.select(F.to_date("Timestamp").alias("TimestampDate"))
      .distinct()
      .withColumn("TimePeriodLower",F.concat("TimestampDate",F.lit(" 00:00:00 - 11:59:59")))
      .withColumn("TimePeriodUpper",F.concat("TimestampDate",F.lit(" 12:00:00 - 23:59:59")))
)

timeperiods = (
    timeperiods.select("TimePeriodLower")
               .union(
                   timeperiods.select("TimePeriodUpper")
               )
               .withColumnRenamed("TimePeriodLower","TimePeriod")
               .crossJoin(
                   df.select("User").distinct()
               )
)

outputDf = (
    df.withColumn(
        "TimePeriod",
        F.concat(
            F.to_date("Timestamp"),
            F.when(
                F.hour("Timestamp")<12," 00:00:00 - 11:59:59"
            ).otherwise(" 12:00:00 - 23:59:59")
        )
    )
    .groupBy(
        "TimePeriod",
        "User"
    )
    .count()
    .join(
        timeperiods,
        ["TimePeriod","User"],
        "right"
    )
    .withColumn("count",F.coalesce("count",F.lit(0)))
    .drop("Timestamp")
    .withColumnRenamed("TimePeriod","Timestamp")
    
)

Using spark sql

  1. Ensure your dataframe is available
df.createOrReplaceTempView("input_df2")
  1. Run the following sql on the spark session
outputDf = sparkSession.sql("<insert sql below here>")

SQL:

WITH unique_users AS (
    SELECT DISTINCT 
        User 
    FROM 
        input_df2
),
unique_dates AS (
    SELECT DISTINCT
        to_date(Timestamp) as TimestampDate 
    FROM 
        input_df2
),
timeperiods as (
    SELECT CONCAT(TimestampDate,' 00:00:00 - 11:59:59') as TimePeriod FROM unique_dates
    UNION ALL
    SELECT CONCAT(TimestampDate,' 12:00:00 - 23:59:59') as TimePeriod FROM unique_dates
),
timeperiod_users as (
    SELECT * FROM timeperiods CROSS JOIN unique_users
),
timeperiod_user_counts as (
    SELECT
        CONCAT(
            to_date(Timestamp),
            CASE
                WHEN HOUR(Timestamp) < 12 THEN ' 00:00:00 - 11:59:59'
                ELSE ' 12:00:00 - 23:59:59'
            END
        ) as Timeperiod,
        User,
        COUNT(1) as COUNT
    FROM
        input_df2
    GROUP BY 1,2
)
SELECT
    tu.TimePeriod as Timestamp,
    tu.User,
    COALESCE(tuc.COUNT,0) as COUNT
FROM
    timeperiod_user_counts tuc
RIGHT JOIN
    timeperiod_users tu ON tu.TimePeriod = tuc.TimePeriod AND
                           tu.User = tuc.User

Outputs

outputDf.orderBy("Timestamp","User").show()
 ------------------------------ ---- ----- 
|Timestamp                     |User|count|
 ------------------------------ ---- ----- 
|2021-11-08 00:00:00 - 11:59:59|A   |2    |
|2021-11-08 00:00:00 - 11:59:59|B   |2    |
|2021-11-08 12:00:00 - 23:59:59|A   |1    |
|2021-11-08 12:00:00 - 23:59:59|B   |0    |
|2021-11-09 00:00:00 - 11:59:59|A   |0    |
|2021-11-09 00:00:00 - 11:59:59|B   |1    |
|2021-11-09 12:00:00 - 23:59:59|A   |0    |
|2021-11-09 12:00:00 - 23:59:59|B   |0    |
 ------------------------------ ---- ----- 
timeperiods.show()
 ------------------------------ ---- 
|TimePeriod                    |User|
 ------------------------------ ---- 
|2021-11-08 00:00:00 - 11:59:59|B   |
|2021-11-08 00:00:00 - 11:59:59|A   |
|2021-11-09 00:00:00 - 11:59:59|B   |
|2021-11-09 00:00:00 - 11:59:59|A   |
|2021-11-08 12:00:00 - 23:59:59|B   |
|2021-11-08 12:00:00 - 23:59:59|A   |
|2021-11-09 12:00:00 - 23:59:59|B   |
|2021-11-09 12:00:00 - 23:59:59|A   |
 ------------------------------ ---- 

CodePudding user response:

PySpark:

This time resolution count can be achieved by using hour from Timestamp and groupBy on date & User within the Spark dataframe as follows:

#Aggeragate records numbers for specific features (User) for certain time-resolution PerDay(24hrs), HalfDay(2x12hrs)
df = new_df.groupBy("User", "date").agg(
    F.sum(F.hour("Timestamp").between(0, 24).cast("int")).alias("NoLogPerDay"),
    F.sum(F.hour("Timestamp").between(0, 11).cast("int")).alias("NoLogPer-1st-12-hrs"),
    F.sum(F.hour("Timestamp").between(12, 23).cast("int")).alias("NoLogPer-2nd-12-hrs"),

).sort('date')

df.show(truncate = False)

Please see the results as your expected frame:

|timestamp          |UserName|
 ------------------- -------- 
|2021-08-11 04:05:06|A       |
|2021-08-11 04:15:06|B       |
|2021-08-11 09:15:26|A       |
|2021-08-11 11:04:06|B       |
|2021-08-11 14:55:16|A       |
|2021-09-11 04:12:11|B       |
 ------------------- -------- 

 ------------------- ---------- -------- 
|timestamp          |date      |UserName|
 ------------------- ---------- -------- 
|2021-08-11 04:05:06|2021-08-11|A       |
|2021-08-11 04:15:06|2021-08-11|B       |
|2021-08-11 09:15:26|2021-08-11|A       |
|2021-08-11 11:04:06|2021-08-11|B       |
|2021-08-11 14:55:16|2021-08-11|A       |
|2021-09-11 04:12:11|2021-09-11|B       |
 ------------------- ---------- -------- 

 -------- ---------- ----------- ------------------- ------------------- 
|UserName|date      |NoLogPerDay|NoLogPer-1st-12-hrs|NoLogPer-2nd-12-hrs|
 -------- ---------- ----------- ------------------- ------------------- 
|A       |2021-08-11|3          |2                  |1                  |
|B       |2021-08-11|2          |2                  |0                  |
|B       |2021-09-11|1          |1                  |0                  |
 -------- ---------- ----------- ------------------- ------------------- 
  • Related