Home > Blockchain >  Joining a dataframe against a filtered version of itself
Joining a dataframe against a filtered version of itself

Time:10-21

I have two dataframes, left and right. The latter, right, is a subset of left, such that left contains all the rows right does. I want to use right to remove redundant rows from left by doing a simple "left_anti" join.

I've discovered that the join doesn't work if I use a filtered version of left on the right. It works only if I reconstruct the right dataframe from scratch.

  • What is going on here?
  • Is there a workaround that doesn't involve recreating the right dataframe?
from pyspark.sql import Row, SparkSession

import pyspark.sql.types as t

schema = t.StructType(
    [
        t.StructField("street_number", t.IntegerType()),
        t.StructField("street_name", t.StringType()),
        t.StructField("lower_street_number", t.IntegerType()),
        t.StructField("upper_street_number", t.IntegerType()),
    ]
)
data =  [
    # Row that conflicts w/ range row, and should be removed
    Row(
        street_number=123,
        street_name="Main St",
        lower_street_number=None,
        upper_street_number=None,
    ),
    # Range row
    Row(
        street_number=None,
        street_name="Main St",
        lower_street_number=120,
        upper_street_number=130,
    ),
]


def join_files(left_side, right_side):
    join_condition = [
      (
        (right_side.lower_street_number.isNotNull())
        & (right_side.upper_street_number.isNotNull())
        & (right_side.lower_street_number <= left_side.street_number)
        & (right_side.upper_street_number >= left_side.street_number)
      )
    ]
    return left_side.join(right_side, join_condition, "left_anti")


spark = SparkSession.builder.getOrCreate()
left = spark.createDataFrame(data, schema)

right_fail = left.filter("lower_street_number IS NOT NULL")
result = join_files(left, right_fail)
result.count() # Returns 2 - both rows still present


right_success = spark.createDataFrame([data[1]], schema)
result = join_files(left, right_success)
result.count() # Returns 1 - the "left_anti" join worked as expected

CodePudding user response:

You could alias the DF's:


import pyspark.sql.functions as F


def join_files(left_side, right_side):
    join_condition = [
      (
        (F.col("right_side.lower_street_number").isNotNull())
        & (F.col("right_side.upper_street_number").isNotNull())
        & (F.col("right_side.lower_street_number") <= F.col("left_side.street_number"))
        & (F.col("right_side.upper_street_number") >= F.col("left_side.street_number"))
      )
    ]
    return left_side.join(right_side, join_condition, "left_anti")


spark = SparkSession.builder.getOrCreate()
left = spark.createDataFrame(data, schema).alias("left_side")


right_fail = left.filter("lower_street_number IS NOT NULL").alias("right_side")
result = join_files(left, right_fail)
print(result.count()) # Returns 2 - both rows still present


right_success = spark.createDataFrame([data[1]], schema).alias("right_side")
result = join_files(left, right_success)
result.count() # Returns 1 - the "left_anti" join worked as expected

Don't know which pyspark version you are on but pyspark==3.0.1, I get the following explanatory error.

AnalysisException: Column lower_street_number#522, upper_street_number#523, lower_street_number#522, upper_street_number#523 are ambiguous. It's probably because you joined several Datasets together, and some of these Datasets are the same. This column points to one of the Datasets but Spark is unable to figure out which one. Please alias the Datasets with different names via `Dataset.as` before joining them, and specify the column using qualified name, e.g. `df.as("a").join(df.as("b"), $"a.id" > $"b.id")`. You can also set spark.sql.analyzer.failAmbiguousSelfJoin to false to disable this check.;

  • Related