Home > Blockchain >  Storing dataframe differences when dataframes have similar column names
Storing dataframe differences when dataframes have similar column names

Time:06-27

I am trying to verify count column from both data frames have an equal count column and if there is mismatch then it should store in a separate data frames for a specific data frames respectively either if the rows of first data frame is not there or the count is not equal.

SQL Solution

Data Creation:

CREATE TABLE A (
  a_row VARCHAR(60) NOT NULL,
   counting INT NOT NULL
);

INSERT INTO A 
    (a_row, counting) 
VALUES 
    ("abc|def|2022-05-27|09:05:17.023|12345",2),
    ("xyz|def|2022-05-27|09:05:17.023|13432",2),
    ("lkj|def|2022-05-07|09:05:17.023|14362",3);
    
CREATE TABLE B (
  b_row VARCHAR(60) NOT NULL,
   counting INT NOT NULL
);

INSERT INTO B 
    (b_row, counting) 
VALUES 
    ("abc|def|2022-05-27|09:05:17.023|12345",2),
    ("xyz|def|2022-05-27|09:05:17.023|13432",1),
    ("poi|def|2022-05-27|09:50:17.023|450545",2);

Queries:

-- rows of A not repeated exactly similar in B
select a.a_row, a.counting from A as a
left outer join B as b 
on b.b_row = a.a_row
where b.counting  != a.counting or b.counting is Null;;

-- rows of B not repeated exactly similar in A
select b.b_row, b.counting
 from B as b
 left outer join A as a
 on a.a_row = b.b_row
 where b.counting  != a.counting or a.counting is Null;

Results:

        a_row                          counting
xyz|def|2022-05-27|09:05:17.023|13432     1
lkj|def|2022-05-07|09:05:17.023|14362     3

        b_row                           counting
xyz|def|2022-05-27|09:05:17.023|13432     2
poi|def|2022-05-27|09:50:17.023|450545    2

These are the answers I am looking to store it in a separate data frames.

Now creating data frames in PySpark

Data frame: A

 -------------------------------------------- ----- 
|a_row                                       |count|
 -------------------------------------------- ----- 
abc|def|2022-05-27|09:05:17.023|12345        | 2   |
xyz|def|2022-05-27|09:05:17.023|13432        | 2   |
lkj|def|2022-05-07|09:05:17.023|14362        | 3   |
 -------------------------------------------- ----- 

Data frame: B

 -------------------------------------------- ----- 
|b_row                                       |count|
 -------------------------------------- ----- ----- 
abc|def|2022-05-27|09:05:17.023|12345        | 2   |
xyz|def|2022-05-27|09:05:17.023|13432        | 1   |
lkj|def|2022-05-07|09:05:17.023|14362        | 3   |
 -------------------------------------------- ----- 

What I tried so far in python 3.7

# finding the rows of A not found in B or count is not equal
A_difference = A.join(B, how='left', on='A.a_row = B.b_row').where('A.counting  != A.counting or B.counting is Null')

# finding the rows of B not found in A or count is not equal
B_difference = B.join(A, how='left', on='A.a_row = B.b_row').where('A.counting  != A.counting or B.counting is Null')

Error:

pyspark.sql.utils.AnalysisException: USING column A.a_row = B.b_row cannot be resolved on the left side of the join. The left-side columns: [a_row, count]

I am not sure how to get the expected output which I got similar in the results from SQL queries.

CodePudding user response:

The main thing is that you referenced A.col, but Spark didn't know what is that A. You had to create alias for your dataframes first. Also, the on clause had wrong syntax.

A_difference = (A.alias('A')
    .join(
        B.alias('B'),
        how='left',
        on=A.a_row == B.b_row)
    .where('B.counting != A.counting or B.counting is Null')
    .select('A.*')
)
B_difference = (B.alias('B')
    .join(
        A.alias('A'),
        how='left',
        on=A.a_row == B.b_row)
    .where('B.counting != A.counting or A.counting is Null')
    .select('B.*')
)
  • Related