Home > Net >  Filter out duplicates within a certain time interval
Filter out duplicates within a certain time interval

Time:09-15

I have a PySpark dataframe where I want to filter out rows that exist in both english and spanish, some mock data:

test_data = [('1', '2022-09-01' , '07:30:29' , '[tech, fx]' , 'YouTube' , 'english' ,'some text here'),
             ('2', '2022-09-01' , '07:30:29' , '[finance, fx]' , 'YouTube' , 'english' ,'some text here'),
             ('3', '2022-09-02' , '06:30:29' , '[tech, banking]' , 'YouTube' , 'english' ,'some text here'),
             ('4', '2022-09-02' , '07:20:29' , '[tech, banking]' , 'YouTube' , 'spanish' ,'Spanish Text'),
             ('5', '2022-09-03' , '07:12:55' , '[finance, fx]' , 'YouTube' , 'english' ,'some text here'),
             ('6', '2022-09-05' , '09:12:55' , '[computer]' , 'Instagram' , 'spanish' ,'Spanish Text'),]

test_data = spark.sparkContext.parallelize(test_data).toDF(['id', 'date', 'time', 'tags', 'source', 'language', 'text'])
 --- ---------- -------- --------------- --------- -------- -------------- 
| id|      date|    time|           tags|   source|language|          text|
 --- ---------- -------- --------------- --------- -------- -------------- 
|  1|2022-09-01|07:30:29|     [tech, fx]|  YouTube| english|some text here|
|  2|2022-09-01|07:30:29|  [finance, fx]|  YouTube| english|some text here|
|  3|2022-09-02|06:30:29|[tech, banking]|  YouTube| english|some text here|
|  4|2022-09-02|07:20:29|[tech, banking]|  YouTube| spanish|  Spanish Text|
|  5|2022-09-03|07:12:55|  [finance, fx]|  YouTube| english|some text here|
|  6|2022-09-05|09:12:55|     [computer]|Instagram| spanish|  Spanish Text|
 --- ---------- -------- --------------- --------- -------- -------------- 

Desired logic: Remove row in spanish IF there is another row -2 hours from that row with the exact same date, tags and source.

I.e., in this example, I would only want to remove row 4.

CodePudding user response:

You could create a window with interval 2 hours preceding and interval 2 hours following. Currently it's only available in SQL syntax, so expr is required.

from pyspark.sql import functions as F

eng_2h_apart = F.expr("""
    any(language = 'english') over (
        partition by date, tags, source
        order by to_timestamp(concat_ws('_', date, time))
        range between interval 2 hours preceding and interval 2 hours following)
""")
test_data = (test_data
    .withColumn('delete', eng_2h_apart & (F.col('language') == 'spanish'))
    .filter("!delete")
    .drop("delete")
)

test_data.show()
#  --- ---------- -------- --------------- --------- -------- -------------- 
# | id|      date|    time|           tags|   source|language|          text|
#  --- ---------- -------- --------------- --------- -------- -------------- 
# |  2|2022-09-01|07:30:29|  [finance, fx]|  YouTube| english|some text here|
# |  1|2022-09-01|07:30:29|     [tech, fx]|  YouTube| english|some text here|
# |  3|2022-09-02|06:30:29|[tech, banking]|  YouTube| english|some text here|
# |  5|2022-09-03|07:12:55|  [finance, fx]|  YouTube| english|some text here|
# |  6|2022-09-05|09:12:55|     [computer]|Instagram| spanish|  Spanish Text|
#  --- ---------- -------- --------------- --------- -------- -------------- 
  • Related