I have two dataframes, left
and right
. The latter, right
, is a subset of left
, such that left
contains all the rows right
does. I want to use right
to remove redundant rows from left
by doing a simple "left_anti" join.
I've discovered that the join doesn't work if I use a filtered version of left
on the right. It works only if I reconstruct the right dataframe from scratch.
- What is going on here?
- Is there a workaround that doesn't involve recreating the right dataframe?
from pyspark.sql import Row, SparkSession
import pyspark.sql.types as t
schema = t.StructType(
[
t.StructField("street_number", t.IntegerType()),
t.StructField("street_name", t.StringType()),
t.StructField("lower_street_number", t.IntegerType()),
t.StructField("upper_street_number", t.IntegerType()),
]
)
data = [
# Row that conflicts w/ range row, and should be removed
Row(
street_number=123,
street_name="Main St",
lower_street_number=None,
upper_street_number=None,
),
# Range row
Row(
street_number=None,
street_name="Main St",
lower_street_number=120,
upper_street_number=130,
),
]
def join_files(left_side, right_side):
join_condition = [
(
(right_side.lower_street_number.isNotNull())
& (right_side.upper_street_number.isNotNull())
& (right_side.lower_street_number <= left_side.street_number)
& (right_side.upper_street_number >= left_side.street_number)
)
]
return left_side.join(right_side, join_condition, "left_anti")
spark = SparkSession.builder.getOrCreate()
left = spark.createDataFrame(data, schema)
right_fail = left.filter("lower_street_number IS NOT NULL")
result = join_files(left, right_fail)
result.count() # Returns 2 - both rows still present
right_success = spark.createDataFrame([data[1]], schema)
result = join_files(left, right_success)
result.count() # Returns 1 - the "left_anti" join worked as expected
CodePudding user response:
You could alias the DF's:
import pyspark.sql.functions as F
def join_files(left_side, right_side):
join_condition = [
(
(F.col("right_side.lower_street_number").isNotNull())
& (F.col("right_side.upper_street_number").isNotNull())
& (F.col("right_side.lower_street_number") <= F.col("left_side.street_number"))
& (F.col("right_side.upper_street_number") >= F.col("left_side.street_number"))
)
]
return left_side.join(right_side, join_condition, "left_anti")
spark = SparkSession.builder.getOrCreate()
left = spark.createDataFrame(data, schema).alias("left_side")
right_fail = left.filter("lower_street_number IS NOT NULL").alias("right_side")
result = join_files(left, right_fail)
print(result.count()) # Returns 2 - both rows still present
right_success = spark.createDataFrame([data[1]], schema).alias("right_side")
result = join_files(left, right_success)
result.count() # Returns 1 - the "left_anti" join worked as expected
Don't know which pyspark version you are on but pyspark==3.0.1
, I get the following explanatory error.
AnalysisException: Column lower_street_number#522, upper_street_number#523, lower_street_number#522, upper_street_number#523 are ambiguous. It's probably because you joined several Datasets together, and some of these Datasets are the same. This column points to one of the Datasets but Spark is unable to figure out which one. Please alias the Datasets with different names via `Dataset.as` before joining them, and specify the column using qualified name, e.g. `df.as("a").join(df.as("b"), $"a.id" > $"b.id")`. You can also set spark.sql.analyzer.failAmbiguousSelfJoin to false to disable this check.;