Home > front end >  PySpark filter DataFrame where values in a column do not exist in another DataFrame column
PySpark filter DataFrame where values in a column do not exist in another DataFrame column

Time:01-19

I don't understand why this isn't working in PySpark...

I'm trying to split the data into an approved DataFrame and a rejected DataFrame based on column values. So rejected looks at the language column values in approved and only returns rows where the language does not exist in the approved DataFrame's language column:

# Data
columns = ["language", "users_count"]
data = [("Java", 20000), ("Python", 100000), ("Scala", 3000), ("C  ", 10000), ("C#", 32195432), ("C", 238135), ("R", 134315), ("Ruby", 235), ("C", 1000), ("R", 2000), ("Ruby", 4000)]

df = spark.createDataFrame(data, columns)
df.show()
#  -------- ----------- 
# |language|users_count|
#  -------- ----------- 
# |    Java|      20000|
# |  Python|     100000|
# |   Scala|       3000|
# |     C  |      10000|
# |      C#|   32195432|
# |       C|     238135|
# |       R|     134315|
# |    Ruby|        235|
# |       C|       1000|
# |       R|       2000|
# |    Ruby|       4000|
#  -------- ----------- 

# Approved
is_approved = df.users_count > 10000
df_approved = df.filter(is_approved)
df_approved.show()
#  -------- ----------- 
# |language|users_count|
#  -------- ----------- 
# |    Java|      20000|
# |  Python|     100000|
# |      C#|   32195432|
# |       C|     238135|
# |       R|     134315|
#  -------- ----------- 

# Rejected
is_not_approved = ~df.language.isin(df_approved.language)
df_rejected = df.filter(is_not_approved)
df_rejected.show()
#  -------- ----------- 
# |language|users_count|
#  -------- ----------- 
#  -------- ----------- 

# Also tried
df.filter( ~df.language.contains(df_approved.language) ).show()
#  -------- ----------- 
# |language|users_count|
#  -------- ----------- 
#  -------- ----------- 

So that doesn't make any sense - why is df_rejected empty?

Expected outcomes using other approaches:

SQL:

SELECT * FROM df
WHERE language NOT IN ( SELECT language FROM df_approved )

Python:

data_approved = []
for language, users_count in data:
    if users_count > 10000:
        data_approved.append((language, users_count))

data_rejected = []
for language, users_count in data:
    if language not in [row[0] for row in data_approved]:
        data_rejected.append((language, users_count))

print(data_approved)
print(data_rejected)
# [('Java', 20000), ('Python', 100000), ('C#', 32195432), ('C', 238135), ('R', 134315)]
# [('Scala', 3000), ('C  ', 10000), ('Ruby', 235), ('Ruby', 4000)]

Why is PySpark not filtering as expected?

CodePudding user response:

Try to:

df.subtract(df_approved).show()
                                                                                    
 -------- ----------- 
|language|users_count|
 -------- ----------- 
|       R|       2000|
|    Ruby|       4000|
|   Scala|       3000|
|       C|       1000|
|     C  |      10000|
|    Ruby|        235|
 -------- ----------- 

CodePudding user response:

First of all you will want to use a window to select the maximum user_count of rows by language.

from pyspark.sql import Window

columns = ["language", "users_count"]
data = [("Java", 20000), ("Python", 100000), ("Scala", 3000), ("C  ", 10000), ("C#", 32195432), ("C", 238135), ("R", 134315), ("Ruby", 235), ("C", 1000), ("R", 2000), ("Ruby", 4000)]
df = spark.createDataFrame(data, columns)

df = (df.withColumn('max_users_count', 
                     functions.max('users_count')
                      .over(w))
                      .where(functions.col('users_count') 
                        == functions.col('max_users_count'))
                         .drop('max_users_count'))
df.show()
 -------- ----------- 
|language|users_count|
 -------- ----------- 
|      C#|   32195432|
|     C  |      10000|
|       C|     238135|
|       R|     134315|
|   Scala|       3000|
|    Ruby|       4000|
|  Python|     100000|
|    Java|      20000|
 -------- ----------- 

Then you can filter based on the specified condition.

is_approved = df.users_count > 10000
df_approved = df.filter(is_approved)
df_approved.show()
 -------- ----------- 
|language|users_count|
 -------- ----------- 
|    Java|      20000|
|  Python|     100000|
|      C#|   32195432|
|       C|     238135|
|       R|     134315|
 -------- ----------- 

And then for the reverse of the condition, add the ~ symbol in the filter statement

is_not_approved = df.filter(~is_approved)
is_not_approved.show()
 -------- ----------- 
|language|users_count|
 -------- ----------- 
|   Scala|       3000|
|     C  |      10000|
|    Ruby|        235|
|       C|       1000|
|       R|       2000|
|    Ruby|       4000|
 -------- ----------- 

CodePudding user response:

Went the SQL route:

columns = ["language", "users_count"]
data = [("Java", 20000), ("Python", 100000), ("Scala", 3000), ("C  ", 10000), ("C#", 32195432), ("C", 238135), ("R", 134315), ("Ruby", 235), ("C", 1000), ("R", 2000), ("Ruby", 4000)]

df = spark.createDataFrame(data, columns)
df_approved = df.filter(df.users_count > 10000)

df.createOrReplaceTempView("df")
df_approved.createOrReplaceTempView("df_approved")

df_not_approved = spark.sql("""
    SELECT * FROM df WHERE NOT EXISTS (
        SELECT 1 FROM df_approved
        WHERE df.language = df_approved.language
        )
""")

df_not_approved.show()

#  -------- ----------- 
# |language|users_count|
#  -------- ----------- 
# |     C  |      10000|
# |    Ruby|        235|
# |    Ruby|       4000|
# |   Scala|       3000|
#  -------- ----------- 
  •  Tags:  
  • Related