Here's my code:
from pyspark.sql import functions as F, Window as W
df_subs_loc_movmnt_ts = df_subs_loc_movmnt.withColumn("new_ts", F.unix_timestamp(F.col("ts"), "HH:mm:ss"))
w = W.partitionBy('subs_no', 'year', 'month', 'day', 'cgi').orderBy('new_ts')
df_subs_loc_movmnt_duration = df_subs_loc_movmnt_ts.withColumn('duration', F.from_unixtime(F.col('new_ts') - F.min('new_ts').over(w), "HH:mm:ss"))
The resulting dataframe:
df_subs_loc_movmnt_duration.show()
-------- --------------- -------- --------------- ------------- ---- ----- --- ------ --------
| date_id| ts| subs_no| cgi| msisdn|year|month|day|new_ts|duration|
-------- --------------- -------- --------------- ------------- ---- ----- --- ------ --------
|20200801|10:40:43.000000|10000093|510-11-610725-5|7664622154085|2022| 6| 2| 13243|07:00:00|
|20200801|12:55:30.000000|10000093|510-11-610725-5|7664622154085|2022| 6| 2| 21330|09:14:47|
|20200801|05:30:47.000000|10000118|510-11-610195-5|7560242795888|2022| 6| 2| -5353|07:00:00|
|20200801|10:55:21.000000|10000118|510-11-610195-5|7560242795888|2022| 6| 2| 14121|12:24:34|
|20200801|13:05:06.000000|10000118|510-11-610195-5|7560242795888|2022| 6| 2| 21906|14:34:19|
|20200801|13:05:50.000000|10000118|510-11-610195-5|7560242795888|2022| 6| 2| 21950|14:35:03|
|20200801|13:06:49.000000|10000118|510-11-610195-5|7560242795888|2022| 6| 2| 22009|14:36:02|
|20200801|13:08:32.000000|10000118|510-11-610195-5|7560242795888|2022| 6| 2| 22112|14:37:45|
|20200801|13:08:44.000000|10000118|510-11-610195-5|7560242795888|2022| 6| 2| 22124|14:37:57|
|20200801|13:09:01.000000|10000118|510-11-610195-5|7560242795888|2022| 6| 2| 22141|14:38:14|
|20200801|19:09:51.000000|10000118|510-11-610195-5|7560242795888|2022| 6| 2| 43791|20:39:04|
|20200801|19:37:16.000000|10000118|510-11-610195-5|7560242795888|2022| 6| 2| 45436|21:06:29|
|20200801|19:55:17.000000|10000118|510-11-610195-5|7560242795888|2022| 6| 2| 46517|21:24:30|
|20200801|13:24:58.000000|10000393|510-11-610354-1|4745471710184|2022| 6| 2| 23098|07:00:00|
|20200801|13:54:43.000000|10000393|510-11-610372-4|4745471710184|2022| 6| 3| 24883|07:00:00|
|20200801|11:38:41.000000|10000406|510-11-620412-4|4411875524723|2022| 6| 3| 16721|07:00:00|
|20200801|08:38:36.000000|10000514|510-11-610658-6|5908140424233|2022| 6| 2| 5916|07:00:00|
|20200801|02:12:05.000000|10000610|510-11-610030-9|6354719688724|2022| 6| 1|-17275|07:00:00|
|20200801|06:41:58.000000|10000610|510-11-610030-9|6354719688724|2022| 6| 1| -1082|11:29:53|
|20200801|06:51:14.000000|10000610|510-11-610030-9|6354719688724|2022| 6| 1| -526|11:39:09|
-------- --------------- -------- --------------- ------------- ---- ----- --- ------ --------
Here's my expected dataframe after the filter which would only keep partitions that have 3 (three) 5 minutes gaps.
-------- --------------- -------- --------------- ------------- ---- ----- --- ------ --------
| date_id| ts| subs_no| cgi| msisdn|year|month|day|new_ts|duration|
-------- --------------- -------- --------------- ------------- ---- ----- --- ------ --------
|20200801|05:30:47.000000|10000118|510-11-610195-5|7560242795888|2022| 6| 2| -5353|07:00:00|
|20200801|10:55:21.000000|10000118|510-11-610195-5|7560242795888|2022| 6| 2| 14121|12:24:34|
|20200801|13:05:06.000000|10000118|510-11-610195-5|7560242795888|2022| 6| 2| 21906|14:34:19|
|20200801|13:05:50.000000|10000118|510-11-610195-5|7560242795888|2022| 6| 2| 21950|14:35:03|
|20200801|13:06:49.000000|10000118|510-11-610195-5|7560242795888|2022| 6| 2| 22009|14:36:02|
|20200801|13:08:32.000000|10000118|510-11-610195-5|7560242795888|2022| 6| 2| 22112|14:37:45|
|20200801|13:08:44.000000|10000118|510-11-610195-5|7560242795888|2022| 6| 2| 22124|14:37:57|
|20200801|13:09:01.000000|10000118|510-11-610195-5|7560242795888|2022| 6| 2| 22141|14:38:14|
|20200801|19:09:51.000000|10000118|510-11-610195-5|7560242795888|2022| 6| 2| 43791|20:39:04|
|20200801|19:37:16.000000|10000118|510-11-610195-5|7560242795888|2022| 6| 2| 45436|21:06:29|
|20200801|19:55:17.000000|10000118|510-11-610195-5|7560242795888|2022| 6| 2| 46517|21:24:30|
-------- --------------- -------- --------------- ------------- ---- ----- --- ------ --------
The dataframe's scheme:
StructType(List(
StructField(date_id, IntegerType, true),
StructField(ts, StringType, true),
StructField(subs_no, LongType, true),
StructField(cgi, StringType, true),
StructField(msisdn, LongType, true),
StructField(year, IntegerType, true),
StructField(month, IntegerType, true),
StructField(day, IntegerType, true),
StructField(new_ts, LongType, true),
StructField(duration, StringType, true)
))
CodePudding user response:
Inputs:
from pyspark.sql import functions as F, Window as W
df_subs_loc_movmnt_duration = spark.createDataFrame(
[(20200801, '10:40:43.000000', 10000093, '510-11-610725-5', 7664622154085, 2022, 6, 2, 13243, '07:00:00'),
(20200801, '12:55:30.000000', 10000093, '510-11-610725-5', 7664622154085, 2022, 6, 2, 21330, '09:14:47'),
(20200801, '05:30:47.000000', 10000118, '510-11-610195-5', 7560242795888, 2022, 6, 2, -5353, '07:00:00'),
(20200801, '10:55:21.000000', 10000118, '510-11-610195-5', 7560242795888, 2022, 6, 2, 14121, '12:24:34'),
(20200801, '13:05:06.000000', 10000118, '510-11-610195-5', 7560242795888, 2022, 6, 2, 21906, '14:34:19'),
(20200801, '13:05:50.000000', 10000118, '510-11-610195-5', 7560242795888, 2022, 6, 2, 21950, '14:35:03'),
(20200801, '13:06:49.000000', 10000118, '510-11-610195-5', 7560242795888, 2022, 6, 2, 22009, '14:36:02'),
(20200801, '13:08:32.000000', 10000118, '510-11-610195-5', 7560242795888, 2022, 6, 2, 22112, '14:37:45'),
(20200801, '13:08:44.000000', 10000118, '510-11-610195-5', 7560242795888, 2022, 6, 2, 22124, '14:37:57'),
(20200801, '13:09:01.000000', 10000118, '510-11-610195-5', 7560242795888, 2022, 6, 2, 22141, '14:38:14'),
(20200801, '19:09:51.000000', 10000118, '510-11-610195-5', 7560242795888, 2022, 6, 2, 43791, '20:39:04'),
(20200801, '19:37:16.000000', 10000118, '510-11-610195-5', 7560242795888, 2022, 6, 2, 45436, '21:06:29'),
(20200801, '19:55:17.000000', 10000118, '510-11-610195-5', 7560242795888, 2022, 6, 2, 46517, '21:24:30'),
(20200801, '13:24:58.000000', 10000393, '510-11-610354-1', 4745471710184, 2022, 6, 2, 23098, '07:00:00'),
(20200801, '13:54:43.000000', 10000393, '510-11-610372-4', 4745471710184, 2022, 6, 3, 24883, '07:00:00'),
(20200801, '11:38:41.000000', 10000406, '510-11-620412-4', 4411875524723, 2022, 6, 3, 16721, '07:00:00'),
(20200801, '08:38:36.000000', 10000514, '510-11-610658-6', 5908140424233, 2022, 6, 2, 5916, '07:00:00'),
(20200801, '02:12:05.000000', 10000610, '510-11-610030-9', 6354719688724, 2022, 6, 1,-17275, '07:00:00'),
(20200801, '06:41:58.000000', 10000610, '510-11-610030-9', 6354719688724, 2022, 6, 1, -1082, '11:29:53'),
(20200801, '06:51:14.000000', 10000610, '510-11-610030-9', 6354719688724, 2022, 6, 1, -526, '11:39:09')],
['date_id', 'ts', 'subs_no', 'cgi', 'msisdn', 'year', 'month', 'day', 'new_ts', 'duration'])
w = W.partitionBy('subs_no', 'year', 'month', 'day', 'cgi').orderBy('new_ts')
First, adding the column which tells if the gap takes at least 5 minutes. 1
means "yes".
gap_gte_5min = ((F.col('new_ts') - F.lag('new_ts').over(w)) >= 5*60).cast('long')
df_with_gap = df_subs_loc_movmnt_duration.withColumn('gap_5min', gap_gte_5min)
Then, adding the column which tells how many gaps (of >= 5 minutes) there are in the window partition:
w2 = W.partitionBy('subs_no', 'year', 'month', 'day', 'cgi')
df_with_gap_count = df_with_gap.withColumn('gap_count', F.sum('gap_5min').over(w2))
Finally, filtering:
df_with_3_gaps = df_with_gap_count.filter(F.col('gap_count') >= 3)
df_with_3_gaps.show()
# -------- --------------- -------- --------------- ------------- ---- ----- --- ------ -------- -------- ---------
# | date_id| ts| subs_no| cgi| msisdn|year|month|day|new_ts|duration|gap_5min|gap_count|
# -------- --------------- -------- --------------- ------------- ---- ----- --- ------ -------- -------- ---------
# |20200801|05:30:47.000000|10000118|510-11-610195-5|7560242795888|2022| 6| 2| -5353|07:00:00| null| 5|
# |20200801|10:55:21.000000|10000118|510-11-610195-5|7560242795888|2022| 6| 2| 14121|12:24:34| 1| 5|
# |20200801|13:05:06.000000|10000118|510-11-610195-5|7560242795888|2022| 6| 2| 21906|14:34:19| 1| 5|
# |20200801|13:05:50.000000|10000118|510-11-610195-5|7560242795888|2022| 6| 2| 21950|14:35:03| 0| 5|
# |20200801|13:06:49.000000|10000118|510-11-610195-5|7560242795888|2022| 6| 2| 22009|14:36:02| 0| 5|
# |20200801|13:08:32.000000|10000118|510-11-610195-5|7560242795888|2022| 6| 2| 22112|14:37:45| 0| 5|
# |20200801|13:08:44.000000|10000118|510-11-610195-5|7560242795888|2022| 6| 2| 22124|14:37:57| 0| 5|
# |20200801|13:09:01.000000|10000118|510-11-610195-5|7560242795888|2022| 6| 2| 22141|14:38:14| 0| 5|
# |20200801|19:09:51.000000|10000118|510-11-610195-5|7560242795888|2022| 6| 2| 43791|20:39:04| 1| 5|
# |20200801|19:37:16.000000|10000118|510-11-610195-5|7560242795888|2022| 6| 2| 45436|21:06:29| 1| 5|
# |20200801|19:55:17.000000|10000118|510-11-610195-5|7560242795888|2022| 6| 2| 46517|21:24:30| 1| 5|
# -------- --------------- -------- --------------- ------------- ---- ----- --- ------ -------- -------- ---------
CodePudding user response:
You can create a DatFrame
with only the groups having at least 3 gaps of 5 minutes and then join it with the original DataFrame
(inner join):
seq_with_gaps_df = (
df
.withColumn('ts', F.to_timestamp('ts')) # convert to Timestamp if needed
.groupBy('subs_no', 'year', 'month', 'day', 'cgi')
.agg(
# Get a sorted list of timestamps for each group
F.array_sort(F.collect_list('ts')).alias('ts_array')
)
# find gaps between consecutive samples in the same group that lasts more than 5 minutes
.withColumn("gaps", F.expr(
"transform(ts_array, (x,i) -> ((unix_timestamp(ts_array[i 1]) - unix_timestamp(x))/60 > 5))"
))
# count the gaps of interest
.withColumn("n_gaps", F.expr(
"aggregate(gaps, 0, (acc, x) -> acc coalesce(cast(x as int), 0))"
))
# select only columns to identify the group
.select('subs_no', 'year', 'month', 'day', 'cgi')
.where(F.col('n_gaps') >= 3)
)
# the join discards rows that do not belong to groups of interest
result_df = (
df
.join(
seq_with_gaps_df,
on=['subs_no', 'year', 'month', 'day', 'cgi'],
how='inner',
)
)
The result is the following:
-------- ---- ----- --- --------------- -------- ------------------- -------------
| subs_no|year|month|day| cgi| date_id| ts| msisdn|
-------- ---- ----- --- --------------- -------- ------------------- -------------
|10000118|2022| 6| 2|510-11-610195-5|20200801|2022-10-23 05:30:47|7560242795888|
|10000118|2022| 6| 2|510-11-610195-5|20200801|2022-10-23 10:55:21|7560242795888|
|10000118|2022| 6| 2|510-11-610195-5|20200801|2022-10-23 13:05:06|7560242795888|
|10000118|2022| 6| 2|510-11-610195-5|20200801|2022-10-23 13:05:50|7560242795888|
|10000118|2022| 6| 2|510-11-610195-5|20200801|2022-10-23 13:06:49|7560242795888|
|10000118|2022| 6| 2|510-11-610195-5|20200801|2022-10-23 13:08:32|7560242795888|
|10000118|2022| 6| 2|510-11-610195-5|20200801|2022-10-23 13:08:44|7560242795888|
|10000118|2022| 6| 2|510-11-610195-5|20200801|2022-10-23 13:09:01|7560242795888|
|10000118|2022| 6| 2|510-11-610195-5|20200801|2022-10-23 19:09:51|7560242795888|
|10000118|2022| 6| 2|510-11-610195-5|20200801|2022-10-23 19:37:16|7560242795888|
|10000118|2022| 6| 2|510-11-610195-5|20200801|2022-10-23 19:55:17|7560242795888|
-------- ---- ----- --- --------------- -------- ------------------- -------------
Using this approach, you do not need to compute new_ts
and duration
and you do not need windows.