I am running PySpark script in which I am doing anti join & union of 2 dataframes. But I want to do it in Spark SQL.
df_src:
------- -------
|call_id|call_nm|
------- -------
| 100| QC|
| 105| XY|
| 110| NM|
| 115| AB|
------- -------
df_lkp:
------- -------
|call_id|call_nm|
------- -------
| 100| QC|
| 105| XY|
| 106| XZ|
------- -------
We have two dataframes: df_src & df_lkp. I am extracting unmatched records from df_src:
df_unmatched = df_src.join(df_lkp, on=column_nm, how='left_anti')
It is giving this result:
df_unmatched
------- -------
|call_id|call_nm|
------- -------
| 110| NM|
| 115| AB|
------- -------
But I want to do this part using Spark SQL. I have created temporary view vw_df_src
& vw_df_lkp
and trying to run the following query, but not getting the result.
unmatched_sql = "SELECT * from vw_df_src where {0} in (select {0} from vw_df_src minus select {0} from vw_df_lkp)".format('call_id')
df_unmatched = sqlContext.sql(unmatched_sql)
I am also doing union of both the dataframes and dropping duplicates. I am using below code:
df_src1 = df_lkp.union(df_src)
df_src1.show(10)
df_src2 = df_src1.dropDuplicates(['call_id'])
df_src2:
------- -------
|call_id|call_nm|
------- -------
| 110| NM|
| 100| QC|
| 115| AB|
| 106| XZ|
| 105| XY|
------- -------
I want this to be done in Spark SQL too.
I am using the following code to create temp views:
df_src = sqlContext.read.format('com.databricks.spark.csv').option("delimiter", '\001').options(header='true',inferSchema='false').load(src_file_nm)
df_src.createOrReplaceTempView('vw_df_src')
df_lkp = sqlContext.read.format('com.databricks.spark.csv').option("delimiter", '\001').options(header='true',inferSchema='false').load(lkp_file)
df_lkp.createOrReplaceTempView('vw_df_lkp')
CodePudding user response:
ANTI JOIN
spark.sql(
"""select * from vw_df_src LEFT ANTI JOIN
vw_df_lkp ON
vw_df_src.call_nm= vw_df_lkp.call_nm """).show()
------- -------
|call_id|call_nm|
------- -------
| 115| AB|
| 110| NM|
------- -------
If running in a notebook cell not initialed as sql TRY
%sql
select * from vw_df_src LEFT ANTI JOIN
vw_df_lkp ON
vw_df_src.call_nm= vw_df_lkp.call_nm
UNION
In pyspark
, union returns duplicates and you have to drop_duplicates
() or use distinct()
. In sql, union eliminates duplicates. The following will therefore do. Spark 2.0.0 unionall()
retuned duplicates and union
is the thing
spark.sql(
"""select * from vw_df_src
union
select * from vw_df_lkp""" ).show()
CodePudding user response:
Preset:
df_src = spark.createDataFrame(
[(100, 'QC'),
(105, 'XY'),
(110, 'NM'),
(115, 'AB')],
['call_id', 'call_nm']
)
df_lkp = spark.createDataFrame(
[(100, 'QC'),
(105, 'XY'),
(105, 'XY'),
(106, 'XZ')],
['call_id', 'call_nm']
)
df_src.createOrReplaceTempView('vw_df_src')
df_lkp.createOrReplaceTempView('vw_df_lkp')
According to your requirements, (anti join union) can be done like this:
spark.sql(
"""
select *
from vw_df_src as a
anti join vw_df_lkp b on a.call_nm=b.call_nm
union (select * from vw_df_lkp)
"""
).show()
# ------- -------
# |call_id|call_nm|
# ------- -------
# | 110| NM|
# | 115| AB|
# | 100| QC|
# | 105| XY|
# | 106| XZ|
# ------- -------
However, it seems that anti join is not needed:
spark.sql(
"""
select * from vw_df_src
union
select * from vw_df_lkp
"""
).show()
# ------- -------
# |call_id|call_nm|
# ------- -------
# | 100| QC|
# | 105| XY|
# | 115| AB|
# | 110| NM|
# | 106| XZ|
# ------- -------