I don't understand why this isn't working in PySpark...
I'm trying to split the data into an approved
DataFrame and a rejected
DataFrame based on column values. So rejected
looks at the language
column values in approved
and only returns rows where the language
does not exist in the approved
DataFrame's language
column:
# Data
columns = ["language", "users_count"]
data = [("Java", 20000), ("Python", 100000), ("Scala", 3000), ("C ", 10000), ("C#", 32195432), ("C", 238135), ("R", 134315), ("Ruby", 235), ("C", 1000), ("R", 2000), ("Ruby", 4000)]
df = spark.createDataFrame(data, columns)
df.show()
# -------- -----------
# |language|users_count|
# -------- -----------
# | Java| 20000|
# | Python| 100000|
# | Scala| 3000|
# | C | 10000|
# | C#| 32195432|
# | C| 238135|
# | R| 134315|
# | Ruby| 235|
# | C| 1000|
# | R| 2000|
# | Ruby| 4000|
# -------- -----------
# Approved
is_approved = df.users_count > 10000
df_approved = df.filter(is_approved)
df_approved.show()
# -------- -----------
# |language|users_count|
# -------- -----------
# | Java| 20000|
# | Python| 100000|
# | C#| 32195432|
# | C| 238135|
# | R| 134315|
# -------- -----------
# Rejected
is_not_approved = ~df.language.isin(df_approved.language)
df_rejected = df.filter(is_not_approved)
df_rejected.show()
# -------- -----------
# |language|users_count|
# -------- -----------
# -------- -----------
# Also tried
df.filter( ~df.language.contains(df_approved.language) ).show()
# -------- -----------
# |language|users_count|
# -------- -----------
# -------- -----------
So that doesn't make any sense - why is df_rejected
empty?
Expected outcomes using other approaches:
SQL:
SELECT * FROM df
WHERE language NOT IN ( SELECT language FROM df_approved )
Python:
data_approved = []
for language, users_count in data:
if users_count > 10000:
data_approved.append((language, users_count))
data_rejected = []
for language, users_count in data:
if language not in [row[0] for row in data_approved]:
data_rejected.append((language, users_count))
print(data_approved)
print(data_rejected)
# [('Java', 20000), ('Python', 100000), ('C#', 32195432), ('C', 238135), ('R', 134315)]
# [('Scala', 3000), ('C ', 10000), ('Ruby', 235), ('Ruby', 4000)]
Why is PySpark not filtering as expected?
CodePudding user response:
Try to:
df.subtract(df_approved).show()
-------- -----------
|language|users_count|
-------- -----------
| R| 2000|
| Ruby| 4000|
| Scala| 3000|
| C| 1000|
| C | 10000|
| Ruby| 235|
-------- -----------
CodePudding user response:
First of all you will want to use a window
to select the maximum user_count
of rows by language
.
from pyspark.sql import Window
columns = ["language", "users_count"]
data = [("Java", 20000), ("Python", 100000), ("Scala", 3000), ("C ", 10000), ("C#", 32195432), ("C", 238135), ("R", 134315), ("Ruby", 235), ("C", 1000), ("R", 2000), ("Ruby", 4000)]
df = spark.createDataFrame(data, columns)
df = (df.withColumn('max_users_count',
functions.max('users_count')
.over(w))
.where(functions.col('users_count')
== functions.col('max_users_count'))
.drop('max_users_count'))
df.show()
-------- -----------
|language|users_count|
-------- -----------
| C#| 32195432|
| C | 10000|
| C| 238135|
| R| 134315|
| Scala| 3000|
| Ruby| 4000|
| Python| 100000|
| Java| 20000|
-------- -----------
Then you can filter based on the specified condition.
is_approved = df.users_count > 10000
df_approved = df.filter(is_approved)
df_approved.show()
-------- -----------
|language|users_count|
-------- -----------
| Java| 20000|
| Python| 100000|
| C#| 32195432|
| C| 238135|
| R| 134315|
-------- -----------
And then for the reverse of the condition, add the ~
symbol in the filter statement
is_not_approved = df.filter(~is_approved)
is_not_approved.show()
-------- -----------
|language|users_count|
-------- -----------
| Scala| 3000|
| C | 10000|
| Ruby| 235|
| C| 1000|
| R| 2000|
| Ruby| 4000|
-------- -----------
CodePudding user response:
Went the SQL route:
columns = ["language", "users_count"]
data = [("Java", 20000), ("Python", 100000), ("Scala", 3000), ("C ", 10000), ("C#", 32195432), ("C", 238135), ("R", 134315), ("Ruby", 235), ("C", 1000), ("R", 2000), ("Ruby", 4000)]
df = spark.createDataFrame(data, columns)
df_approved = df.filter(df.users_count > 10000)
df.createOrReplaceTempView("df")
df_approved.createOrReplaceTempView("df_approved")
df_not_approved = spark.sql("""
SELECT * FROM df WHERE NOT EXISTS (
SELECT 1 FROM df_approved
WHERE df.language = df_approved.language
)
""")
df_not_approved.show()
# -------- -----------
# |language|users_count|
# -------- -----------
# | C | 10000|
# | Ruby| 235|
# | Ruby| 4000|
# | Scala| 3000|
# -------- -----------