Home > Software design >  Filter out rows in Spark dataframe based on condition
Filter out rows in Spark dataframe based on condition

Time:07-11

Example Spark dataframe:

product     type
table     Furniture
chair     Furniture
TV        Electronic
.
.

I want to drop all the rows with type as Electronic if there exists any row where type is Furniture.

Real data here has million of rows.

Easy way is to count rows with type Furniture and if its greater than zero then drop rows with type Electronic, but this would be inefficient.

Is there a way to do this efficiently?

CodePudding user response:

Not sure if it's exposed to the Pyspark API but you can use ANY in an expression:

check = df.selectExpr('ANY(type = Furniture) as chk').collect[0]["chk"]

if chk:
  df_filtered = df.where(col("type") != "Electronic")
else:
  df_filtered = df

CodePudding user response:

As far as I can understand, if any product is classified as Furniture, you want to remove such product's classifications as Electronic. E.g., if TV is classified both, as Electronic and Furniture, you would like to remove Electronic classification, so that TV would only be classified as Furniture.

You will have to do some kind of aggregation. The following is a way using window functions:

from pyspark.sql import functions as F, Window as W

df = spark.createDataFrame(
    [('table', 'Furniture'),
     ('chair', 'Furniture'),
     ('TV', 'Electronic'),
     ('TV', 'Furniture')],
    ['product', 'type'])

w = W.partitionBy('product').rowsBetween(W.unboundedPreceding, W.unboundedFollowing)
df = df.withColumn('_types', F.collect_set('type').over(w))
df = df.filter((F.col('type') != 'Electronic') | F.forall('_types', lambda x: x != 'Furniture'))
df = df.drop('_types')

df.show()
#  ------- --------- 
# |product|     type|
#  ------- --------- 
# |     TV|Furniture|
# |  chair|Furniture|
# |  table|Furniture|
#  ------- --------- 

  • Related