Home > Software engineering >  Pyspark : Deleting/Removing rows with conditions
Pyspark : Deleting/Removing rows with conditions

Time:01-18

I have a dataframe which looks like this

df = spark.createDataFrame([("1", "INSERT"),("2", "INSERT"),("3", "INSERT"), ("2", "MODIFY"), ("1", "DELETE"), ("3", "DELETE"), ("4", "INSERT")], ["id", "label"])
df.show()
 --- ------ 
| id| label|
 --- ------ 
|  1|INSERT|
|  2|INSERT|
|  3|INSERT|
|  2|MODIFY|
|  1|DELETE|
|  3|DELETE|
|  4|INSERT|
 --- ------ 

I'm trying to remove rows based on a condition that , those rows which have label as 'DELETE' which also has a 'INSERT' label with same id has to be removed

so the result would be something like:

 --- ------ 
| id| label|
 --- ------ 
|   |      |
|  2|INSERT|
|  2|MODIFY|      
|  4|INSERT|

 --- ------ 

tried used something like filter in pyspark

df.filter("label !='DELETE' AND id !='1'").show()

but not sure how to put more conditions. Do not want to use pandas.

CodePudding user response:

try this:

from pyspark.sql import functions as F

df = spark.createDataFrame([("1", "INSERT"),("2", "INSERT"),("3", "INSERT"), ("2", "MODIFY"), ("1", "DELETE"), ("3", "DELETE"), ("4", "INSERT")], ["id", "label"])

df.groupBy(df.id).agg(F.collect_list(df.label).alias('labels')) \
    .where((~ F.array_contains(F.col('labels'), 'DELETE'))) \
    .select(df.id, F.explode(F.col('labels')).alias('label')) \
    .show()

output:

 --- ------ 
| id| label|
 --- ------ 
|  4|INSERT|
|  2|MODIFY|
|  2|INSERT|
 --- ------ 
  • Group by id and collect_list(label);
  • Filter out rows containing DELETE;
  • Restore to the original row with explode().

It should be noted that this does not preserve the original row order, nor does it save rows with null label.

If you have other columns, You can try the following code. There is no aggregation on the original DF, so other normal columns will not be affected.

from pyspark.sql import functions as F

df = spark.createDataFrame([("5", None, 'columnTest'), ("1", "INSERT", 'columnTest'), ("2", "INSERT", 'columnTest'),("3", "INSERT", 'columnTest'), ("2", "MODIFY", 'columnTest'), ("1", "DELETE", 'columnTest'), ("3", "DELETE", 'columnTest'), ("4", "INSERT", 'columnTest')], ["id", "label", 'columnTest'])

ids = df.where(df.label == 'DELETE') \
    .groupBy(df.label).agg(F.collect_set(df.id).alias('ids')).collect()[0]['ids']

df.where(~ df.id.isin(ids)).show()

output:

 --- ------ ---------- 
| id| label|columnTest|
 --- ------ ---------- 
|  5|  null|columnTest|
|  2|INSERT|columnTest|
|  2|MODIFY|columnTest|
|  4|INSERT|columnTest|
 --- ------ ---------- 
  • Get all id of DELETE;
  • Just filter out rows id in ids.

This preserves the original row order and the label field with null value.

  • Related