Home > Software design >  Can I write multiple DataFrames in parallel in Spark?
Can I write multiple DataFrames in parallel in Spark?

Time:08-20

I have question i want to sequentially write many dataframe in avro format and i use the code below in a for loop.

df
  .repartition(<number-of-partition>)
  .write 
  .mode(<write-mode>)
  .avro(<file-path>)

The problem is when i run my spark job , I see at a time only one task is getting executed (so , only 1 data frame is getting written) . Also when I checked the number of active executors in the spark-ui , I see only 1 executor is being used.

Is it possible to write DataFrames in parallel in Spark? If yes am i doing it the good way?

CodePudding user response:

To run multiple parallel jobs, you need to submit them from separate threads:

Inside a given Spark application (SparkContext instance), multiple parallel jobs can run simultaneously if they were submitted from separate threads. By “job”, in this section, we mean a Spark action (e.g. save, collect) and any tasks that need to run to evaluate that action.

You can check Comparison of sequential and parallel writing

EDIT: I'm using parquet format but using avro format should be the same.

CodePudding user response:

The other two answers are correct. However, here's another approach using Scala Futures.

You can find an elaborate explanation here, about this approach: http://www.russellspitzer.com/2017/02/27/Concurrency-In-Spark/

You will see that the respective output directories are indeed outputted at around the same time, rather than sequentially.

import org.apache.spark.sql.DataFrame
import scala.concurrent.duration.Duration._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Await, Future}

def write(df:DataFrame, i:Int) =
    df.repartition(5).write.mode("overwrite").avro(s"/path/to/output_$i")

val dataframes = Iterator(df, df2) // replace with a list of your data frames

// This is the line that "executes" the writes in parallel
// Use dataframes.zipWithIndex.map to create an iterator/list of futures
// Use Future.sequence to compose together all of these futures, into one future
// Use Await.result to wait until this "composite" future completes
Await.result(Future.sequence(Iterator(df, df2).zipWithIndex.map{ case (d, i) => Future(write(d, i))}), Inf)

You can set a timeout (other than Inf), and also batch together sublists of dataframes, if needed, to limit the parallelism.

  • Related