I have two PySpark DataFrames. I am looking for records which are not in both datasets based on specific columns.
Sample datasets:
# Prepare Data
data_1 = [("A", 1, "data_1"), \
("A", 1, "data_1"), \
("A", 1, "data_1"), \
("A", 2, "data_1")
]
# Create DataFrame
columns= ["col_1", "col_2", "source"]
df_1 = spark.createDataFrame(data = data_1, schema = columns)
df_1.show(truncate=False)
# Prepare Data
data_2 = [("A", 1, "data_2"), \
("A", 1, "data_2"), \
("A", 1, "data_2"), \
("A", 3, "data_2")
]
# Create DataFrame
columns= ["col_1", "col_2", "source"]
df_2 = spark.createDataFrame(data = data_2, schema = columns)
df_2.show(truncate=False)
I want to compare above DataFrames based on columns col_1
& col_2
and get the records which are only in one of the DataFrames. The expected results are:
col_1 | col_2 | source |
---|---|---|
"A" | 2 | "data_1" |
"A" | 3 | "data_2" |
Any idea how to solve it?
CodePudding user response:
You can do LEFT_ANTI based on two columns which will give you the records present in one dataframe but missing from another. You can then union both the outputs.
// Use comma separated string instead of colList in case of python
Dataset<Row> missingInRight = leftDF.join(rightDF, colList, "left_anti");
Dataset<Row> missingInLeft = rightDF.join(leftDF, colList, "left_anti");
missingInRight.union(missingInLeft).show();
Output:
----- ----- ------
|col_1|col_2|source|
----- ----- ------
| A| 2|data_1|
| A| 3|data_2|
----- ----- ------
You can also add a column to tell you which dataframe didn't had the record.
Dataset<Row> missingInRight = leftDF.join(rightDF, colList, "left_anti")
.withColumn("Comment", functions.lit("NOT_IN_RIGHT"));
Dataset<Row> missingInLeft = rightDF.join(leftDF, colList, "left_anti")
.withColumn("Comment", functions.lit("NOT_IN_LEFT"));
missingInRight.union(missingInLeft).show();
Output:
----- ----- ------ ------------
|col_1|col_2|source| Comment|
----- ----- ------ ------------
| A| 2|data_1|NOT_IN_RIGHT|
| A| 3|data_2| NOT_IN_LEFT|
----- ----- ------ ------------
In case of comparing all the columns, you can use "except"
leftDF.except(rightDF)