I have a PySpark dataframe where I want to filter out rows that exist in both english and spanish, some mock data:
test_data = [('1', '2022-09-01' , '07:30:29' , '[tech, fx]' , 'YouTube' , 'english' ,'some text here'),
('2', '2022-09-01' , '07:30:29' , '[finance, fx]' , 'YouTube' , 'english' ,'some text here'),
('3', '2022-09-02' , '06:30:29' , '[tech, banking]' , 'YouTube' , 'english' ,'some text here'),
('4', '2022-09-02' , '07:20:29' , '[tech, banking]' , 'YouTube' , 'spanish' ,'Spanish Text'),
('5', '2022-09-03' , '07:12:55' , '[finance, fx]' , 'YouTube' , 'english' ,'some text here'),
('6', '2022-09-05' , '09:12:55' , '[computer]' , 'Instagram' , 'spanish' ,'Spanish Text'),]
test_data = spark.sparkContext.parallelize(test_data).toDF(['id', 'date', 'time', 'tags', 'source', 'language', 'text'])
--- ---------- -------- --------------- --------- -------- --------------
| id| date| time| tags| source|language| text|
--- ---------- -------- --------------- --------- -------- --------------
| 1|2022-09-01|07:30:29| [tech, fx]| YouTube| english|some text here|
| 2|2022-09-01|07:30:29| [finance, fx]| YouTube| english|some text here|
| 3|2022-09-02|06:30:29|[tech, banking]| YouTube| english|some text here|
| 4|2022-09-02|07:20:29|[tech, banking]| YouTube| spanish| Spanish Text|
| 5|2022-09-03|07:12:55| [finance, fx]| YouTube| english|some text here|
| 6|2022-09-05|09:12:55| [computer]|Instagram| spanish| Spanish Text|
--- ---------- -------- --------------- --------- -------- --------------
Desired logic: Remove row in spanish IF there is another row -2 hours from that row with the exact same date, tags and source.
I.e., in this example, I would only want to remove row 4.
CodePudding user response:
You could create a window with interval 2 hours preceding and interval 2 hours following
. Currently it's only available in SQL syntax, so expr
is required.
from pyspark.sql import functions as F
eng_2h_apart = F.expr("""
any(language = 'english') over (
partition by date, tags, source
order by to_timestamp(concat_ws('_', date, time))
range between interval 2 hours preceding and interval 2 hours following)
""")
test_data = (test_data
.withColumn('delete', eng_2h_apart & (F.col('language') == 'spanish'))
.filter("!delete")
.drop("delete")
)
test_data.show()
# --- ---------- -------- --------------- --------- -------- --------------
# | id| date| time| tags| source|language| text|
# --- ---------- -------- --------------- --------- -------- --------------
# | 2|2022-09-01|07:30:29| [finance, fx]| YouTube| english|some text here|
# | 1|2022-09-01|07:30:29| [tech, fx]| YouTube| english|some text here|
# | 3|2022-09-02|06:30:29|[tech, banking]| YouTube| english|some text here|
# | 5|2022-09-03|07:12:55| [finance, fx]| YouTube| english|some text here|
# | 6|2022-09-05|09:12:55| [computer]|Instagram| spanish| Spanish Text|
# --- ---------- -------- --------------- --------- -------- --------------