Home > Enterprise >  Spark driver synchronous execution
Spark driver synchronous execution

Time:10-17

From my understanding, Spark uses lazy evaluation to perform the actual computation. A transformation chain is executed on the executors only when an action is called on that chain. All of this happens on the driver, which is a single thread process. By default, the order of actions declared in the code is always preserved.

My question is: Does the driver perform a synchronous call by waiting the triggered execution completes before proceeding with the next code instruction? In other terms: are driver scheduling instructions blocking?

Here is an example with just Spark API calls:

spark
    .read
    .schema(mySchema)
    .json(myFilePath)
    .withColumn("a", col("b") * 2)
    .filter(col("c") > 300)
    .count()

spark
    .read
    .schema(mySchema2)
    .json(myFilePath2)
    .filter(col("d") < 100)
    .count()

Here, two actions are scheduled and executed, by default, ensuring the same order as they're declared.

Another example with a Scala statement:

val df1 = spark
    .read
    .schema(mySchema)
    .json(myFilePath)
    .withColumn("a", col("b") * 2)
    .filter(col("c") > 300)

// no execution happened until here

val df1Count = df1.count()    // "count" action triggers the execution

println(s"df1 contains ${df1Count} rows.").  // rows are logged correctly

Given the fact that df1Count contains the result of an execution on a cluster, does the driver wait to complete the execution before calling the println statement?

Am I missing something? I'd like to know more about it, so some official documentation or blog post would be helpful.

CodePudding user response:

In your example, both the withColumn and filter operations are transformations so they are evaluated lazily. The count operation is an action that triggers the execution of those transformations.

This distinction is discussed in the spark documentation (this part of the documentation discusses RDDs but the same applies to dataframes).

CodePudding user response:

Yes, the DRIVER is responsible for the generation of the Logical and Physical Plan.

So the driver is "waiting" for an action (here, count), to create logicial plan, and send work to executors.

Then, your scala code is waiting for executor responses end execute other lines.

BTW, this is possible to execute code in parrallal: see https://medium.com/analytics-vidhya/boosting-apache-spark-application-by-running-multiple-parallel-jobs-25d13ee7d2a6:

object ParallelProcessing {

  val queries: List[(String, String)] = List(
    ("SELECT * FROM ABC", "output1"), 
    ("SELECT * FROM XYZ", "output2")
  )

  // Just use parallel collection instead of futures, that's it
  queries.par foreach { 
    case (query, path) =>
      val dataPath = s"${pathPrefix}/{path}"
      executeAndSave(query, dataPath)
  }
  
  def executeAndSave(query: String, dataPath: String)(implicit context: Context): Unit = {
    println(s"$query starts")
    context.spark.sql(query).write.mode("overwrite").parquet(dataPath)
    println(s"$query completes")
  }
  
}
  • Related