Home > Net >  Calculate duration inside groups based on timestamp
Calculate duration inside groups based on timestamp

Time:10-11

I have a dataframe looking like this:

| id | device| x | y | z | timestamp |
  1   device_1 22  8   23  2020-10-30T16:00:00.000 0000
  1   device_1 21  88  65  2020-10-30T16:01:00.000 0000
  1   device_1 33  34  64  2020-10-30T16:02:00.000 0000
  2   device_2 12  6   97  2019-11-30T13:00:00.000 0000
  2   device_2 44  77  13  2019-11-30T13:00:00.000 0000
  1   device_1 22  11  30  2022-10-30T08:00:00.000 0000
  1   device_1 22  11  30  2022-10-30T08:01:00.000 0000

The data represents events for an "id" on a certain point in time. I would like to see development of values over a period of time to plot a time series for instance.

I'm thinking of adding a column 'duration' which is 0 for the first entry and then the difference in the next entry related to the same id on the same day (there might be multiple different event streams for the same id on separate days).

I would ideally want a dataframe looking something like this:

| id | device | x | y | z | timestamp |                  duration |
  1   device_1 22  8   23  2020-10-30T16:00:00.000 0000   00:00.00.000
  1   device_1 21  88  65  2020-10-30T16:01:00.000 0000   00:01:00.000
  1   device_1 33  34  64  2020-10-30T16:02:00.000 0000   00:02:00.000
  2   device_2 12  6   97  2019-11-30T13:00:00.000 0000   00:00:00.000
  2   device_2 44  77  13  2019-11-30T13:00:30.000 0000   00:00:30.000
  1   device_1 22  11  30  2022-10-30T08:00:00.000 0000   00:00:00.000
  1   device_1 22  11  30  2022-10-30T08:01:00.000 0000   00:01:00.000

I have no idea where to begin in order to achieve this so either a good explanation or a code example would be very helpful!

Any other suggestions on how to be able to plot development over time (in general not related to a specific date or time of the day) based on this dataframe are also very welcome.

Note: It has to be in PySpark (not pandas) since the dataset is extremely large.

CodePudding user response:

your problem can be resolved using Window functions a below

  from pyspark.sql import SparkSession
    spark = SparkSession.Builder().getOrCreate()
    
    df = spark.createDataFrame(
        [
            (1,'device_1',22,8,23,'2020-10-30T16:00:00.000 0000'),
            (1,'device_1',21,88,65,'2020-10-30T16:01:00.000 0000'),
            (1,'device_1',33,34,64,'2020-10-30T16:02:00.000 0000'),
            (2,'device_2',12,6,97,'2019-11-30T13:00:00.000 0000') ,
            (2,'device_2',44,77,13,'2019-11-30T13:00:00.000 0000'),
            (1,'device_1',22,11,30,'2022-10-30T08:00:00.000 0000'),
            (1,'device_1',22,11,30,'2022-10-30T08:01:00.000 0000')
        ],
        ("id", "device_name", "x","y","z","timestmp"))
    
    df.show(5, False)
    
     --- ----------- --- --- --- ---------------------------- 
    |id |device_name|x  |y  |z  |timestmp                    |
     --- ----------- --- --- --- ---------------------------- 
    |1  |device_1   |22 |8  |23 |2020-10-30T16:00:00.000 0000|
    |1  |device_1   |21 |88 |65 |2020-10-30T16:01:00.000 0000|
    |1  |device_1   |33 |34 |64 |2020-10-30T16:02:00.000 0000|
    |2  |device_2   |12 |6  |97 |2019-11-30T13:00:00.000 0000|
    |2  |device_2   |44 |77 |13 |2019-11-30T13:00:00.000 0000|
     --- ----------- --- --- --- ---------------------------- 

from pyspark.sql.functions import *

df_1 = df.withColumn("timestmp_t", to_timestamp(col("timestmp")))
df_1 = df_1.withColumn("date_t", to_date(substring(col("timestmp"), 1, 10)))
df_1.show(5)

 --- ----------- --- --- --- -------------------- ------------------- ---------- 
| id|device_name|  x|  y|  z|            timestmp|         timestmp_t|    date_t|
 --- ----------- --- --- --- -------------------- ------------------- ---------- 
|  1|   device_1| 22|  8| 23|2020-10-30T16:00:...|2020-10-30 16:00:00|2020-10-30|
|  1|   device_1| 21| 88| 65|2020-10-30T16:01:...|2020-10-30 16:01:00|2020-10-30|
|  1|   device_1| 33| 34| 64|2020-10-30T16:02:...|2020-10-30 16:02:00|2020-10-30|
|  2|   device_2| 12|  6| 97|2019-11-30T13:00:...|2019-11-30 13:00:00|2019-11-30|
|  2|   device_2| 44| 77| 13|2019-11-30T13:00:...|2019-11-30 13:00:00|2019-11-30|
 --- ----------- --- --- --- -------------------- ------------------- ---------- 

df_1.createOrReplaceTempView("tmp_table")

spark.sql("""
select t.*, (timestmp_t - min) as duration from (
SELECT id, device_name, date_t, timestmp_t, MIN(timestmp_t) OVER (PARTITION BY id, date_t ORDER BY timestmp_t) AS min
    FROM tmp_table) as t
""").show(5, False)

 --- ----------- ---------- ------------------- ------------------- ----------------------------------- 
|id |device_name|date_t    |timestmp_t         |min                |duration                           |
 --- ----------- ---------- ------------------- ------------------- ----------------------------------- 
|1  |device_1   |2020-10-30|2020-10-30 16:00:00|2020-10-30 16:00:00|INTERVAL '0 00:00:00' DAY TO SECOND|
|1  |device_1   |2020-10-30|2020-10-30 16:01:00|2020-10-30 16:00:00|INTERVAL '0 00:01:00' DAY TO SECOND|
|1  |device_1   |2020-10-30|2020-10-30 16:02:00|2020-10-30 16:00:00|INTERVAL '0 00:02:00' DAY TO SECOND|
|1  |device_1   |2022-10-30|2022-10-30 08:00:00|2022-10-30 08:00:00|INTERVAL '0 00:00:00' DAY TO SECOND|
|1  |device_1   |2022-10-30|2022-10-30 08:01:00|2022-10-30 08:00:00|INTERVAL '0 00:01:00' DAY TO SECOND|
 --- ----------- ---------- ------------------- ------------------- ----------------------------------- 

CodePudding user response:

You will need to use window functions (specific functions working inside partitions created using over clause). The below code does the same thing as in the other answer, but I wanted to show a more streamlined version, fully in PySpark, as opposed to PySpark SQL with subqueries.

Initially, the column "difference" will be of type interval, so then it's up to you to try to transform it to whatever data type you need. I have just extracted the interval using regexp_extract which stores it as string.

Input (I assume your "timestamp" column is of type timestamp):

from pyspark.sql import functions as F, Window as W
df = spark.createDataFrame(
    [(1, 'device_1', 22, 8, 23, '2020-10-30T16:00:00.000 0000'),
     (1, 'device_1', 21, 88, 65, '2020-10-30T16:01:00.000 0000'),
     (1, 'device_1', 33, 34, 64, '2020-10-30T16:02:00.000 0000'),
     (2, 'device_2', 12, 6, 97, '2019-11-30T13:00:00.000 0000') ,
     (2, 'device_2', 44, 77, 13, '2019-11-30T13:00:30.000 0000'),
     (1, 'device_1', 22, 11, 30, '2022-10-30T08:00:00.000 0000'),
     (1, 'device_1', 22, 11, 30, '2022-10-30T08:01:00.000 0000')],
    ["id", "device", "x", "y", "z", "timestamp"]
).withColumn("timestamp", F.to_timestamp("timestamp"))

Script:

w = W.partitionBy('id', F.to_date('timestamp')).orderBy('timestamp')
df = df.withColumn('duration', F.col('timestamp') - F.min('timestamp').over(w))
df = df.withColumn('duration', F.regexp_extract('duration', r'\d\d:\d\d:\d\d', 0))

df.show(truncate=0)
#  --- -------- --- --- --- ------------------- -------- 
# |id |device  |x  |y  |z  |timestamp          |duration|
#  --- -------- --- --- --- ------------------- -------- 
# |1  |device_1|22 |8  |23 |2020-10-30 16:00:00|00:00:00|
# |1  |device_1|21 |88 |65 |2020-10-30 16:01:00|00:01:00|
# |1  |device_1|33 |34 |64 |2020-10-30 16:02:00|00:02:00|
# |1  |device_1|22 |11 |30 |2022-10-30 08:00:00|00:00:00|
# |1  |device_1|22 |11 |30 |2022-10-30 08:01:00|00:01:00|
# |2  |device_2|12 |6  |97 |2019-11-30 13:00:00|00:00:00|
# |2  |device_2|44 |77 |13 |2019-11-30 13:00:30|00:00:30|
#  --- -------- --- --- --- ------------------- -------- 
  • Related