I was running below scala spark code on databricks. I am facing an issue like last few task are running very slowly. where 1st few tasks are completing with in seconds. I was tried with repartation of dataframe. But didn't helped. I am also not understanding what basics the tasks are divided to drocket node.
//#reading files from DBFS. And creating temp view on top of that.
val cdf_hsc_interim_nobed_main=spark.read.format("delta").option("header","true").load(s"${Interimpath}/cdf_hsc_interim_nobed")
cdf_hsc_interim_nobed_main.createOrReplaceTempView("cdf_hsc_interim_nobed")
spark.sql(s"REFRESH TABLE cdf_hsc_interim_nobed")
val cdf_hsc_facl_decn_bed_interim_main=spark.read.format("delta").option("header","true").load(s"${Interimpath}/cdf_hsc_facl_decn_bed_interim")
cdf_hsc_facl_decn_bed_interim_main.createOrReplaceTempView("cdf_hsc_facl_decn_bed_interim")
spark.sql(s"REFRESH TABLE cdf_hsc_facl_decn_bed_interim")
spark.sql(
s"""
select
nb.*,
facl.id as facl_decn_id,
facl.seq_nbr as facl_decn_seq_nbr,
case when facl.id is not null then concat(substr(nb.cse_dttm, 1, 10), ' 00:00:00.000') else cast(null as string) end as eng_dt
from interim_nobed nb
left outer join decn_bed_interim facl on
(nb.id=facl.hsc_id and nb.facl_decn_id=facl.hsc_id)
where nb.facl_id is not null
union all
select
nb.*,
cast(null as int) as facl_bed_id,
cast(null as int) as facl_bed_seq_nbr,
cast(null as string) as engg_dt
from interim_nobed nb
where nb.facl_id is null
""").write.mode("overwrite").option("header", "true").parquet(s"${Interimpath}/set1_interimdelete")
CodePudding user response:
Adaptive Query Execution (AQE) is the best practice for speeding up the query.
In Spark 3.0, the AQE framework is shipped with three features:
- coalescing shuffle partitions
- switching join strategies
- optimizing skew joins
All the above operations are dynamically implemented.
Check the below link for better implementation.
CodePudding user response:
From the query side, try filtering before joining. Even though the spark is smart enough sometimes a nudge from an Engineer helps.
select
nb.*
,facl.id as facl_decn_id
,facl.seq_nbr as facl_decn_seq_nbr
,case when facl.id is not null then concat(substr(nb.cse_dttm, 1, 10), ' 00:00:00.000') else cast(null as string) end as eng_dt
from
(select
*
from
interim_nobed
where
facl_id is not null
) nb
left outer join
decn_bed_interim facl
on
nb.id=facl.hsc_id and nb.facl_decn_id=facl.hsc_id
UNION ALL
select
nb.*
,cast(null as int) as facl_bed_id
,cast(null as int) as facl_bed_seq_nbr
,cast(null as string) as engg_dt
from
interim_nobed nb
where
nb.facl_id is null