Home > Software engineering >  Last few tasks in spark job are running very slow
Last few tasks in spark job are running very slow

Time:05-30

I was running below scala spark code on databricks. I am facing an issue like last few task are running very slowly. where 1st few tasks are completing with in seconds. I was tried with repartation of dataframe. But didn't helped. I am also not understanding what basics the tasks are divided to drocket node.

//#reading files from DBFS. And creating temp view on top of that.

val cdf_hsc_interim_nobed_main=spark.read.format("delta").option("header","true").load(s"${Interimpath}/cdf_hsc_interim_nobed")
cdf_hsc_interim_nobed_main.createOrReplaceTempView("cdf_hsc_interim_nobed")
spark.sql(s"REFRESH TABLE cdf_hsc_interim_nobed")

val cdf_hsc_facl_decn_bed_interim_main=spark.read.format("delta").option("header","true").load(s"${Interimpath}/cdf_hsc_facl_decn_bed_interim")
cdf_hsc_facl_decn_bed_interim_main.createOrReplaceTempView("cdf_hsc_facl_decn_bed_interim")
spark.sql(s"REFRESH TABLE cdf_hsc_facl_decn_bed_interim")
spark.sql(
          s"""
select
nb.*,
facl.id as facl_decn_id,
facl.seq_nbr as facl_decn_seq_nbr,
case when facl.id is not null then concat(substr(nb.cse_dttm, 1, 10), ' 00:00:00.000') else cast(null as string) end as eng_dt
from interim_nobed nb
left outer join decn_bed_interim facl on
(nb.id=facl.hsc_id and nb.facl_decn_id=facl.hsc_id)
where nb.facl_id is not null
union all
select
nb.*,
cast(null as int) as facl_bed_id,
cast(null as int) as facl_bed_seq_nbr,
cast(null as string) as engg_dt
from interim_nobed nb
where nb.facl_id is null
""").write.mode("overwrite").option("header", "true").parquet(s"${Interimpath}/set1_interimdelete")

CodePudding user response:

Adaptive Query Execution (AQE) is the best practice for speeding up the query.

In Spark 3.0, the AQE framework is shipped with three features:

  • coalescing shuffle partitions
  • switching join strategies
  • optimizing skew joins

All the above operations are dynamically implemented.

Check the below link for better implementation.

https://databricks.com/blog/2020/05/29/adaptive-query-execution-speeding-up-spark-sql-at-runtime.html

CodePudding user response:

From the query side, try filtering before joining. Even though the spark is smart enough sometimes a nudge from an Engineer helps.

select
    nb.*
    ,facl.id as facl_decn_id
    ,facl.seq_nbr as facl_decn_seq_nbr
    ,case when facl.id is not null then concat(substr(nb.cse_dttm, 1, 10), ' 00:00:00.000') else cast(null as string) end as eng_dt
from
    (select 
        *    
    from 
        interim_nobed
    where 
        facl_id is not null
    ) nb
    
left outer join 
    decn_bed_interim facl
on 
    nb.id=facl.hsc_id and nb.facl_decn_id=facl.hsc_id

UNION ALL

select
    nb.*
    ,cast(null as int) as facl_bed_id
    ,cast(null as int) as facl_bed_seq_nbr
    ,cast(null as string) as engg_dt
from 
    interim_nobed nb
where 
    nb.facl_id is null
  • Related