I am trying to verify count column from both data frames have an equal count column and if there is mismatch then it should store in a separate data frames for a specific data frames respectively either if the rows of first data frame is not there or the count is not equal.
SQL Solution
Data Creation:
CREATE TABLE A (
a_row VARCHAR(60) NOT NULL,
counting INT NOT NULL
);
INSERT INTO A
(a_row, counting)
VALUES
("abc|def|2022-05-27|09:05:17.023|12345",2),
("xyz|def|2022-05-27|09:05:17.023|13432",2),
("lkj|def|2022-05-07|09:05:17.023|14362",3);
CREATE TABLE B (
b_row VARCHAR(60) NOT NULL,
counting INT NOT NULL
);
INSERT INTO B
(b_row, counting)
VALUES
("abc|def|2022-05-27|09:05:17.023|12345",2),
("xyz|def|2022-05-27|09:05:17.023|13432",1),
("poi|def|2022-05-27|09:50:17.023|450545",2);
Queries:
-- rows of A not repeated exactly similar in B
select a.a_row, a.counting from A as a
left outer join B as b
on b.b_row = a.a_row
where b.counting != a.counting or b.counting is Null;;
-- rows of B not repeated exactly similar in A
select b.b_row, b.counting
from B as b
left outer join A as a
on a.a_row = b.b_row
where b.counting != a.counting or a.counting is Null;
Results:
a_row counting
xyz|def|2022-05-27|09:05:17.023|13432 1
lkj|def|2022-05-07|09:05:17.023|14362 3
b_row counting
xyz|def|2022-05-27|09:05:17.023|13432 2
poi|def|2022-05-27|09:50:17.023|450545 2
These are the answers I am looking to store it in a separate data frames.
Now creating data frames in PySpark
Data frame: A
-------------------------------------------- -----
|a_row |count|
-------------------------------------------- -----
abc|def|2022-05-27|09:05:17.023|12345 | 2 |
xyz|def|2022-05-27|09:05:17.023|13432 | 2 |
lkj|def|2022-05-07|09:05:17.023|14362 | 3 |
-------------------------------------------- -----
Data frame: B
-------------------------------------------- -----
|b_row |count|
-------------------------------------- ----- -----
abc|def|2022-05-27|09:05:17.023|12345 | 2 |
xyz|def|2022-05-27|09:05:17.023|13432 | 1 |
lkj|def|2022-05-07|09:05:17.023|14362 | 3 |
-------------------------------------------- -----
What I tried so far in python 3.7
# finding the rows of A not found in B or count is not equal
A_difference = A.join(B, how='left', on='A.a_row = B.b_row').where('A.counting != A.counting or B.counting is Null')
# finding the rows of B not found in A or count is not equal
B_difference = B.join(A, how='left', on='A.a_row = B.b_row').where('A.counting != A.counting or B.counting is Null')
Error:
pyspark.sql.utils.AnalysisException: USING column
A.a_row = B.b_row
cannot be resolved on the left side of the join. The left-side columns: [a_row, count]
I am not sure how to get the expected output which I got similar in the results from SQL queries.
CodePudding user response:
The main thing is that you referenced A.col
, but Spark didn't know what is that A
. You had to create alias
for your dataframes first. Also, the on
clause had wrong syntax.
A_difference = (A.alias('A')
.join(
B.alias('B'),
how='left',
on=A.a_row == B.b_row)
.where('B.counting != A.counting or B.counting is Null')
.select('A.*')
)
B_difference = (B.alias('B')
.join(
A.alias('A'),
how='left',
on=A.a_row == B.b_row)
.where('B.counting != A.counting or A.counting is Null')
.select('B.*')
)