Home > Software design >  Anti join followed by union in Spark SQL
Anti join followed by union in Spark SQL

Time:05-12

I am running PySpark script in which I am doing anti join & union of 2 dataframes. But I want to do it in Spark SQL.

df_src:

 ------- ------- 
|call_id|call_nm|
 ------- ------- 
|    100|     QC|
|    105|     XY|
|    110|     NM|
|    115|     AB|
 ------- ------- 

df_lkp:

 ------- ------- 
|call_id|call_nm|
 ------- ------- 
|    100|     QC|
|    105|     XY|
|    106|     XZ|
 ------- ------- 

We have two dataframes: df_src & df_lkp. I am extracting unmatched records from df_src:

df_unmatched = df_src.join(df_lkp, on=column_nm, how='left_anti')

It is giving this result:

df_unmatched

 ------- ------- 
|call_id|call_nm|
 ------- ------- 
|    110|     NM|
|    115|     AB|
 ------- ------- 

But I want to do this part using Spark SQL. I have created temporary view vw_df_src & vw_df_lkp and trying to run the following query, but not getting the result.

unmatched_sql = "SELECT * from vw_df_src where {0} in (select {0} from vw_df_src minus select {0} from vw_df_lkp)".format('call_id')
df_unmatched = sqlContext.sql(unmatched_sql)

I am also doing union of both the dataframes and dropping duplicates. I am using below code:

df_src1 = df_lkp.union(df_src)
df_src1.show(10)
df_src2 = df_src1.dropDuplicates(['call_id'])

df_src2:

 ------- ------- 
|call_id|call_nm|
 ------- ------- 
|    110|     NM|
|    100|     QC|
|    115|     AB|
|    106|     XZ|
|    105|     XY|
 ------- ------- 

I want this to be done in Spark SQL too.

I am using the following code to create temp views:

df_src = sqlContext.read.format('com.databricks.spark.csv').option("delimiter", '\001').options(header='true',inferSchema='false').load(src_file_nm)
df_src.createOrReplaceTempView('vw_df_src')
df_lkp = sqlContext.read.format('com.databricks.spark.csv').option("delimiter", '\001').options(header='true',inferSchema='false').load(lkp_file)
df_lkp.createOrReplaceTempView('vw_df_lkp')

CodePudding user response:

ANTI JOIN

spark.sql(
"""select * from vw_df_src LEFT ANTI JOIN 

vw_df_lkp  ON

vw_df_src.call_nm= vw_df_lkp.call_nm """).show()


 ------- ------- 
|call_id|call_nm|
 ------- ------- 
|    115|     AB|
|    110|     NM|
 ------- ------- 

If running in a notebook cell not initialed as sql TRY

%sql 
select * from vw_df_src LEFT ANTI JOIN 

vw_df_lkp  ON

vw_df_src.call_nm= vw_df_lkp.call_nm 

UNION

In pyspark, union returns duplicates and you have to drop_duplicates() or use distinct(). In sql, union eliminates duplicates. The following will therefore do. Spark 2.0.0 unionall() retuned duplicates and union is the thing

spark.sql(
"""select * from vw_df_src

union

select * from vw_df_lkp""" ).show()

CodePudding user response:

Preset:

df_src = spark.createDataFrame(
    [(100, 'QC'),
     (105, 'XY'),
     (110, 'NM'),
     (115, 'AB')],
    ['call_id', 'call_nm']
)
df_lkp = spark.createDataFrame(
    [(100, 'QC'),
     (105, 'XY'),
     (105, 'XY'),
     (106, 'XZ')],
    ['call_id', 'call_nm']
)
df_src.createOrReplaceTempView('vw_df_src')
df_lkp.createOrReplaceTempView('vw_df_lkp')

According to your requirements, (anti join union) can be done like this:

spark.sql(
    """
    select *
    from vw_df_src as a
    anti join vw_df_lkp b on a.call_nm=b.call_nm
    union (select * from vw_df_lkp)
    """
).show()
#  ------- ------- 
# |call_id|call_nm|
#  ------- ------- 
# |    110|     NM|
# |    115|     AB|
# |    100|     QC|
# |    105|     XY|
# |    106|     XZ|
#  ------- ------- 

However, it seems that anti join is not needed:

spark.sql(
    """
    select * from vw_df_src
    union
    select * from vw_df_lkp
    """
).show()
#  ------- ------- 
# |call_id|call_nm|
#  ------- ------- 
# |    100|     QC|
# |    105|     XY|
# |    115|     AB|
# |    110|     NM|
# |    106|     XZ|
#  ------- ------- 
  • Related