Home > OS >  Calculate duration within groups in PySpark
Calculate duration within groups in PySpark

Time:10-12

I want to calculate the duration within groups of the same date_id, subs_no, year, month, and day. If it's the first entry, it should just display "first".

Here's my dataset:

 -------- --------------- -------- ---- ----- --- 
| date_id|             ts| subs_no|year|month|day|
 -------- --------------- -------- ---- ----- --- 
|20200801|14:27:18.000000|10007239|2022|    6|  1|
|20200801|14:29:44.000000|10054647|2022|    6|  1|
|20200801|08:24:21.000000|10057750|2022|    6|  1|
|20200801|13:49:27.000000|10019958|2022|    6|  1|
|20200801|20:07:32.000000|10019958|2022|    6|  1|
 -------- --------------- -------- ---- ----- --- 

NB: column "ts" is of string type.

Here's my expected output:

 -------- --------------- -------- ---- ----- --- --------- 
| date_id|             ts| subs_no|year|month|day| duration|
 -------- --------------- -------- ---- ----- --- --------- 
|20200801|14:27:18.000000|10007239|2022|    6|  1| first   |
|20200801|14:29:44.000000|10054647|2022|    6|  1| first   |
|20200801|08:24:21.000000|10057750|2022|    6|  1| first   |
|20200801|13:49:27.000000|10019958|2022|    6|  1| first   |
|20200801|20:07:32.000000|10019958|2022|    6|  1| 6:18:05 |
 -------- --------------- -------- ---- ----- --- --------- 

CodePudding user response:

You could try joining some of the columns into one which will represent a real timestamp. Then, do calculations using min as a window function. Finally, replace duration of "00:00:00" to "first".

Input:

from pyspark.sql import functions as F, Window as W
df = spark.createDataFrame(
    [('20200801', '14:27:18.000000', '10007239', 2022, 6, 1),
     ('20200801', '14:29:44.000000', '10054647', 2022, 6, 1),
     ('20200801', '08:24:21.000000', '10057750', 2022, 6, 1),
     ('20200801', '13:49:27.000000', '10019958', 2022, 6, 1),
     ('20200801', '20:07:32.000000', '10019958', 2022, 6, 1)],
    ['date_id', 'ts', 'subs_no', 'year', 'month', 'day'])

Script:

ts = F.to_timestamp(F.format_string('%d-%d-%d %s','year', 'month', 'day', 'ts'))
w = W.partitionBy('date_id', 'subs_no', 'year', 'month', 'day').orderBy(ts)
df = df.withColumn(
    'duration',
    F.regexp_extract(ts - F.min(ts).over(w), r'\d\d:\d\d:\d\d', 0)
)
df = df.replace('00:00:00', 'first', 'duration')

df.show()
#  -------- --------------- -------- ---- ----- --- -------- 
# |date_id |ts             |subs_no |year|month|day|duration|
#  -------- --------------- -------- ---- ----- --- -------- 
# |20200801|14:27:18.000000|10007239|2022|6    |1  |first   |
# |20200801|13:49:27.000000|10019958|2022|6    |1  |first   |
# |20200801|20:07:32.000000|10019958|2022|6    |1  |06:18:05|
# |20200801|14:29:44.000000|10054647|2022|6    |1  |first   |
# |20200801|08:24:21.000000|10057750|2022|6    |1  |first   |
#  -------- --------------- -------- ---- ----- --- -------- 

CodePudding user response:

Use window function.Code and logic below

    w=Window.partitionBy('date_id',  'subs_no', 'year', 'month').orderBy('date_id',  'subs_no', 'year', 'month')

new =(df.withColumn('ty', to_timestamp('ts'))#Coerce to timestamp.
.withColumn('duration',when(first('ty').over(w)==col('ty'),'first').otherwise(regexp_extract(col('ty')-first('ty').over(w),'\d{2}:\d{2}:\d{2}',0)))#use window functions to align consecutive tos of ts.Where ts does not change, delineate as first else compute distance nad extract time lapsed
 .drop('ty')
 .orderBy('date_id',  'ts','subs_no', 'year', 'month'))
new.show()

 -------- --------------- -------- ---- ----- --- -------- 
| date_id|             ts| subs_no|year|month|day|duration|
 -------- --------------- -------- ---- ----- --- -------- 
|20200801|08:24:21.000000|10057750|2022|    6|  1|   first|
|20200801|13:49:27.000000|10019958|2022|    6|  1|   first|
|20200801|14:27:18.000000|10007239|2022|    6|  1|   first|
|20200801|14:29:44.000000|10054647|2022|    6|  1|   first|
|20200801|20:07:32.000000|10019958|2022|    6|  1|06:18:05|
 -------- --------------- -------- ---- ----- --- -------- 
  • Related