I have a dataframe which looks like this
df = spark.createDataFrame([("1", "INSERT"),("2", "INSERT"),("3", "INSERT"), ("2", "MODIFY"), ("1", "DELETE"), ("3", "DELETE"), ("4", "INSERT")], ["id", "label"])
df.show()
--- ------
| id| label|
--- ------
| 1|INSERT|
| 2|INSERT|
| 3|INSERT|
| 2|MODIFY|
| 1|DELETE|
| 3|DELETE|
| 4|INSERT|
--- ------
I'm trying to remove rows based on a condition that , those rows which have label as 'DELETE' which also has a 'INSERT' label with same id has to be removed
so the result would be something like:
--- ------
| id| label|
--- ------
| | |
| 2|INSERT|
| 2|MODIFY|
| 4|INSERT|
--- ------
tried used something like filter in pyspark
df.filter("label !='DELETE' AND id !='1'").show()
but not sure how to put more conditions. Do not want to use pandas.
CodePudding user response:
try this:
from pyspark.sql import functions as F
df = spark.createDataFrame([("1", "INSERT"),("2", "INSERT"),("3", "INSERT"), ("2", "MODIFY"), ("1", "DELETE"), ("3", "DELETE"), ("4", "INSERT")], ["id", "label"])
df.groupBy(df.id).agg(F.collect_list(df.label).alias('labels')) \
.where((~ F.array_contains(F.col('labels'), 'DELETE'))) \
.select(df.id, F.explode(F.col('labels')).alias('label')) \
.show()
output:
--- ------
| id| label|
--- ------
| 4|INSERT|
| 2|MODIFY|
| 2|INSERT|
--- ------
- Group by
id
andcollect_list(label)
; - Filter out rows containing
DELETE
; - Restore to the original row with
explode()
.
It should be noted that this does not preserve the original row order, nor does it save rows with null label.
If you have other columns, You can try the following code. There is no aggregation on the original DF, so other normal columns will not be affected.
from pyspark.sql import functions as F
df = spark.createDataFrame([("5", None, 'columnTest'), ("1", "INSERT", 'columnTest'), ("2", "INSERT", 'columnTest'),("3", "INSERT", 'columnTest'), ("2", "MODIFY", 'columnTest'), ("1", "DELETE", 'columnTest'), ("3", "DELETE", 'columnTest'), ("4", "INSERT", 'columnTest')], ["id", "label", 'columnTest'])
ids = df.where(df.label == 'DELETE') \
.groupBy(df.label).agg(F.collect_set(df.id).alias('ids')).collect()[0]['ids']
df.where(~ df.id.isin(ids)).show()
output:
--- ------ ----------
| id| label|columnTest|
--- ------ ----------
| 5| null|columnTest|
| 2|INSERT|columnTest|
| 2|MODIFY|columnTest|
| 4|INSERT|columnTest|
--- ------ ----------
- Get all id of
DELETE
; - Just filter out rows id in
ids
.
This preserves the original row order and the label field with null value.