Home > Software engineering >  Optimization query for DataFrame Spark
Optimization query for DataFrame Spark

Time:12-24

I try create DataFrame from Hive table. But I bad work with Spark API.

I need help to optimize the query in method getLastSession, make two tasks into one task for spark:

val pathTable = new File("/src/test/spark-warehouse/test_db.db/test_table").getAbsolutePath
val path      = new Path(s"$pathTable${if(onlyPartition) s"/name_process=$processName" else ""}").toString
val df        = spark.read.parquet(path)


def getLastSession: Dataset[Row] = {
  val lastTime        = df.select(max(col("time_write"))).collect()(0)(0).toString
  val lastSession     = df.select(col("id_session")).where(col("time_write") === lastTime).collect()(0)(0).toString
  val dfByLastSession = df.filter(col("id_session") === lastSession)

  dfByLastSession.show()
  /*
   ---------- ---------------- ------------------ ------- 
  |id_session|      time_write|               key|  value|
   ---------- ---------------- ------------------ ------- 
  |alskdfksjd|1639950466414000|schema2.table2.csv|Failure|

  */
  dfByLastSession
}

PS. My Source Table (for example):

name_process id_session time_write key value
OtherClass jsdfsadfsf 43434883477 schema0.table0.csv Success
OtherClass jksdfkjhka 23212123323 schema1.table1.csv Success
OtherClass alskdfksjd 23343212234 schema2.table2.csv Failure
ExternalClass sdfjkhsdfd 34455453434 schema3.table3.csv Success

CodePudding user response:

You can use row_number with Window like this:

import org.apache.spark.sql.expressions.Window

val dfByLastSession = df.withColumn(
  "rn", 
  row_number().over(Window.orderBy(desc("time_write")))
).filter("rn=1").drop("rn")
    
dfByLastSession.show()

However, as you do not partition by any field maybe it can degrade performances.

Another thing you can change in your code, is using struct ordering to get the id_session associated with most recent time_write with one query:

val lastSession = df.select(max(struct(col("time_write"), col("id_session")))("id_session")).first.getString(0)

val dfByLastSession = df.filter(col("id_session") === lastSession)
  • Related