Home > Enterprise >  Filter window partitions which have at least three 5-minute gaps
Filter window partitions which have at least three 5-minute gaps

Time:10-24

Here's my code:

from pyspark.sql import functions as F, Window as W

df_subs_loc_movmnt_ts = df_subs_loc_movmnt.withColumn("new_ts", F.unix_timestamp(F.col("ts"), "HH:mm:ss"))
w = W.partitionBy('subs_no', 'year', 'month', 'day', 'cgi').orderBy('new_ts')
df_subs_loc_movmnt_duration = df_subs_loc_movmnt_ts.withColumn('duration', F.from_unixtime(F.col('new_ts') - F.min('new_ts').over(w), "HH:mm:ss"))

The resulting dataframe:

df_subs_loc_movmnt_duration.show()

 -------- --------------- -------- --------------- ------------- ---- ----- --- ------ -------- 
| date_id|             ts| subs_no|            cgi|       msisdn|year|month|day|new_ts|duration|
 -------- --------------- -------- --------------- ------------- ---- ----- --- ------ -------- 
|20200801|10:40:43.000000|10000093|510-11-610725-5|7664622154085|2022|    6|  2| 13243|07:00:00|
|20200801|12:55:30.000000|10000093|510-11-610725-5|7664622154085|2022|    6|  2| 21330|09:14:47|
|20200801|05:30:47.000000|10000118|510-11-610195-5|7560242795888|2022|    6|  2| -5353|07:00:00|
|20200801|10:55:21.000000|10000118|510-11-610195-5|7560242795888|2022|    6|  2| 14121|12:24:34|
|20200801|13:05:06.000000|10000118|510-11-610195-5|7560242795888|2022|    6|  2| 21906|14:34:19|
|20200801|13:05:50.000000|10000118|510-11-610195-5|7560242795888|2022|    6|  2| 21950|14:35:03|
|20200801|13:06:49.000000|10000118|510-11-610195-5|7560242795888|2022|    6|  2| 22009|14:36:02|
|20200801|13:08:32.000000|10000118|510-11-610195-5|7560242795888|2022|    6|  2| 22112|14:37:45|
|20200801|13:08:44.000000|10000118|510-11-610195-5|7560242795888|2022|    6|  2| 22124|14:37:57|
|20200801|13:09:01.000000|10000118|510-11-610195-5|7560242795888|2022|    6|  2| 22141|14:38:14|
|20200801|19:09:51.000000|10000118|510-11-610195-5|7560242795888|2022|    6|  2| 43791|20:39:04|
|20200801|19:37:16.000000|10000118|510-11-610195-5|7560242795888|2022|    6|  2| 45436|21:06:29|
|20200801|19:55:17.000000|10000118|510-11-610195-5|7560242795888|2022|    6|  2| 46517|21:24:30|
|20200801|13:24:58.000000|10000393|510-11-610354-1|4745471710184|2022|    6|  2| 23098|07:00:00|
|20200801|13:54:43.000000|10000393|510-11-610372-4|4745471710184|2022|    6|  3| 24883|07:00:00|
|20200801|11:38:41.000000|10000406|510-11-620412-4|4411875524723|2022|    6|  3| 16721|07:00:00|
|20200801|08:38:36.000000|10000514|510-11-610658-6|5908140424233|2022|    6|  2|  5916|07:00:00|
|20200801|02:12:05.000000|10000610|510-11-610030-9|6354719688724|2022|    6|  1|-17275|07:00:00|
|20200801|06:41:58.000000|10000610|510-11-610030-9|6354719688724|2022|    6|  1| -1082|11:29:53|
|20200801|06:51:14.000000|10000610|510-11-610030-9|6354719688724|2022|    6|  1|  -526|11:39:09|
 -------- --------------- -------- --------------- ------------- ---- ----- --- ------ -------- 

Here's my expected dataframe after the filter which would only keep partitions that have 3 (three) 5 minutes gaps.

 -------- --------------- -------- --------------- ------------- ---- ----- --- ------ -------- 
| date_id|             ts| subs_no|            cgi|       msisdn|year|month|day|new_ts|duration|
 -------- --------------- -------- --------------- ------------- ---- ----- --- ------ -------- 
|20200801|05:30:47.000000|10000118|510-11-610195-5|7560242795888|2022|    6|  2| -5353|07:00:00|
|20200801|10:55:21.000000|10000118|510-11-610195-5|7560242795888|2022|    6|  2| 14121|12:24:34|
|20200801|13:05:06.000000|10000118|510-11-610195-5|7560242795888|2022|    6|  2| 21906|14:34:19|
|20200801|13:05:50.000000|10000118|510-11-610195-5|7560242795888|2022|    6|  2| 21950|14:35:03|
|20200801|13:06:49.000000|10000118|510-11-610195-5|7560242795888|2022|    6|  2| 22009|14:36:02|
|20200801|13:08:32.000000|10000118|510-11-610195-5|7560242795888|2022|    6|  2| 22112|14:37:45|
|20200801|13:08:44.000000|10000118|510-11-610195-5|7560242795888|2022|    6|  2| 22124|14:37:57|
|20200801|13:09:01.000000|10000118|510-11-610195-5|7560242795888|2022|    6|  2| 22141|14:38:14|
|20200801|19:09:51.000000|10000118|510-11-610195-5|7560242795888|2022|    6|  2| 43791|20:39:04|
|20200801|19:37:16.000000|10000118|510-11-610195-5|7560242795888|2022|    6|  2| 45436|21:06:29|
|20200801|19:55:17.000000|10000118|510-11-610195-5|7560242795888|2022|    6|  2| 46517|21:24:30|
 -------- --------------- -------- --------------- ------------- ---- ----- --- ------ -------- 

The dataframe's scheme:

StructType(List(
    StructField(date_id, IntegerType, true),
    StructField(ts, StringType, true),
    StructField(subs_no, LongType, true),
    StructField(cgi, StringType, true),
    StructField(msisdn, LongType, true),
    StructField(year, IntegerType, true),
    StructField(month, IntegerType, true),
    StructField(day, IntegerType, true),
    StructField(new_ts, LongType, true),
    StructField(duration, StringType, true)
))

CodePudding user response:

Inputs:

from pyspark.sql import functions as F, Window as W
df_subs_loc_movmnt_duration = spark.createDataFrame(
    [(20200801, '10:40:43.000000', 10000093, '510-11-610725-5', 7664622154085, 2022, 6, 2, 13243, '07:00:00'),
     (20200801, '12:55:30.000000', 10000093, '510-11-610725-5', 7664622154085, 2022, 6, 2, 21330, '09:14:47'),
     (20200801, '05:30:47.000000', 10000118, '510-11-610195-5', 7560242795888, 2022, 6, 2, -5353, '07:00:00'),
     (20200801, '10:55:21.000000', 10000118, '510-11-610195-5', 7560242795888, 2022, 6, 2, 14121, '12:24:34'),
     (20200801, '13:05:06.000000', 10000118, '510-11-610195-5', 7560242795888, 2022, 6, 2, 21906, '14:34:19'),
     (20200801, '13:05:50.000000', 10000118, '510-11-610195-5', 7560242795888, 2022, 6, 2, 21950, '14:35:03'),
     (20200801, '13:06:49.000000', 10000118, '510-11-610195-5', 7560242795888, 2022, 6, 2, 22009, '14:36:02'),
     (20200801, '13:08:32.000000', 10000118, '510-11-610195-5', 7560242795888, 2022, 6, 2, 22112, '14:37:45'),
     (20200801, '13:08:44.000000', 10000118, '510-11-610195-5', 7560242795888, 2022, 6, 2, 22124, '14:37:57'),
     (20200801, '13:09:01.000000', 10000118, '510-11-610195-5', 7560242795888, 2022, 6, 2, 22141, '14:38:14'),
     (20200801, '19:09:51.000000', 10000118, '510-11-610195-5', 7560242795888, 2022, 6, 2, 43791, '20:39:04'),
     (20200801, '19:37:16.000000', 10000118, '510-11-610195-5', 7560242795888, 2022, 6, 2, 45436, '21:06:29'),
     (20200801, '19:55:17.000000', 10000118, '510-11-610195-5', 7560242795888, 2022, 6, 2, 46517, '21:24:30'),
     (20200801, '13:24:58.000000', 10000393, '510-11-610354-1', 4745471710184, 2022, 6, 2, 23098, '07:00:00'),
     (20200801, '13:54:43.000000', 10000393, '510-11-610372-4', 4745471710184, 2022, 6, 3, 24883, '07:00:00'),
     (20200801, '11:38:41.000000', 10000406, '510-11-620412-4', 4411875524723, 2022, 6, 3, 16721, '07:00:00'),
     (20200801, '08:38:36.000000', 10000514, '510-11-610658-6', 5908140424233, 2022, 6, 2,  5916, '07:00:00'),
     (20200801, '02:12:05.000000', 10000610, '510-11-610030-9', 6354719688724, 2022, 6, 1,-17275, '07:00:00'),
     (20200801, '06:41:58.000000', 10000610, '510-11-610030-9', 6354719688724, 2022, 6, 1, -1082, '11:29:53'),
     (20200801, '06:51:14.000000', 10000610, '510-11-610030-9', 6354719688724, 2022, 6, 1,  -526, '11:39:09')],
    ['date_id', 'ts', 'subs_no', 'cgi', 'msisdn', 'year', 'month', 'day', 'new_ts', 'duration'])

w = W.partitionBy('subs_no', 'year', 'month', 'day', 'cgi').orderBy('new_ts')

First, adding the column which tells if the gap takes at least 5 minutes. 1 means "yes".

gap_gte_5min = ((F.col('new_ts') - F.lag('new_ts').over(w)) >= 5*60).cast('long')
df_with_gap = df_subs_loc_movmnt_duration.withColumn('gap_5min', gap_gte_5min)

Then, adding the column which tells how many gaps (of >= 5 minutes) there are in the window partition:

w2 = W.partitionBy('subs_no', 'year', 'month', 'day', 'cgi')
df_with_gap_count = df_with_gap.withColumn('gap_count', F.sum('gap_5min').over(w2))

Finally, filtering:

df_with_3_gaps = df_with_gap_count.filter(F.col('gap_count') >= 3)

df_with_3_gaps.show()
#  -------- --------------- -------- --------------- ------------- ---- ----- --- ------ -------- -------- --------- 
# | date_id|             ts| subs_no|            cgi|       msisdn|year|month|day|new_ts|duration|gap_5min|gap_count|
#  -------- --------------- -------- --------------- ------------- ---- ----- --- ------ -------- -------- --------- 
# |20200801|05:30:47.000000|10000118|510-11-610195-5|7560242795888|2022|    6|  2| -5353|07:00:00|    null|        5|
# |20200801|10:55:21.000000|10000118|510-11-610195-5|7560242795888|2022|    6|  2| 14121|12:24:34|       1|        5|
# |20200801|13:05:06.000000|10000118|510-11-610195-5|7560242795888|2022|    6|  2| 21906|14:34:19|       1|        5|
# |20200801|13:05:50.000000|10000118|510-11-610195-5|7560242795888|2022|    6|  2| 21950|14:35:03|       0|        5|
# |20200801|13:06:49.000000|10000118|510-11-610195-5|7560242795888|2022|    6|  2| 22009|14:36:02|       0|        5|
# |20200801|13:08:32.000000|10000118|510-11-610195-5|7560242795888|2022|    6|  2| 22112|14:37:45|       0|        5|
# |20200801|13:08:44.000000|10000118|510-11-610195-5|7560242795888|2022|    6|  2| 22124|14:37:57|       0|        5|
# |20200801|13:09:01.000000|10000118|510-11-610195-5|7560242795888|2022|    6|  2| 22141|14:38:14|       0|        5|
# |20200801|19:09:51.000000|10000118|510-11-610195-5|7560242795888|2022|    6|  2| 43791|20:39:04|       1|        5|
# |20200801|19:37:16.000000|10000118|510-11-610195-5|7560242795888|2022|    6|  2| 45436|21:06:29|       1|        5|
# |20200801|19:55:17.000000|10000118|510-11-610195-5|7560242795888|2022|    6|  2| 46517|21:24:30|       1|        5|
#  -------- --------------- -------- --------------- ------------- ---- ----- --- ------ -------- -------- --------- 

CodePudding user response:

You can create a DatFrame with only the groups having at least 3 gaps of 5 minutes and then join it with the original DataFrame (inner join):

seq_with_gaps_df = (
    df
    .withColumn('ts', F.to_timestamp('ts'))  # convert to Timestamp if needed
    .groupBy('subs_no', 'year', 'month', 'day', 'cgi')
    .agg(
        # Get a sorted list of timestamps for each group
        F.array_sort(F.collect_list('ts')).alias('ts_array')
    )
    # find gaps between consecutive samples in the same group that lasts more than 5 minutes
    .withColumn("gaps", F.expr(
        "transform(ts_array, (x,i) -> ((unix_timestamp(ts_array[i 1]) - unix_timestamp(x))/60  > 5))"
    ))
    # count the gaps of interest
    .withColumn("n_gaps", F.expr(
        "aggregate(gaps, 0, (acc, x) -> acc   coalesce(cast(x as int), 0))"
    ))
    # select only columns to identify the group
    .select('subs_no', 'year', 'month', 'day', 'cgi')
    .where(F.col('n_gaps') >= 3)
)

# the join discards rows that do not belong to groups of interest
result_df = (
    df
    .join(
        seq_with_gaps_df,
        on=['subs_no', 'year', 'month', 'day', 'cgi'],
        how='inner',
    )
)

The result is the following:

 -------- ---- ----- --- --------------- -------- ------------------- ------------- 
| subs_no|year|month|day|            cgi| date_id|                 ts|       msisdn|
 -------- ---- ----- --- --------------- -------- ------------------- ------------- 
|10000118|2022|    6|  2|510-11-610195-5|20200801|2022-10-23 05:30:47|7560242795888|
|10000118|2022|    6|  2|510-11-610195-5|20200801|2022-10-23 10:55:21|7560242795888|
|10000118|2022|    6|  2|510-11-610195-5|20200801|2022-10-23 13:05:06|7560242795888|
|10000118|2022|    6|  2|510-11-610195-5|20200801|2022-10-23 13:05:50|7560242795888|
|10000118|2022|    6|  2|510-11-610195-5|20200801|2022-10-23 13:06:49|7560242795888|
|10000118|2022|    6|  2|510-11-610195-5|20200801|2022-10-23 13:08:32|7560242795888|
|10000118|2022|    6|  2|510-11-610195-5|20200801|2022-10-23 13:08:44|7560242795888|
|10000118|2022|    6|  2|510-11-610195-5|20200801|2022-10-23 13:09:01|7560242795888|
|10000118|2022|    6|  2|510-11-610195-5|20200801|2022-10-23 19:09:51|7560242795888|
|10000118|2022|    6|  2|510-11-610195-5|20200801|2022-10-23 19:37:16|7560242795888|
|10000118|2022|    6|  2|510-11-610195-5|20200801|2022-10-23 19:55:17|7560242795888|
 -------- ---- ----- --- --------------- -------- ------------------- ------------- 

Using this approach, you do not need to compute new_ts and duration and you do not need windows.

  • Related