I have question i want to sequentially write many dataframe in avro format and i use the code below in a for loop.
df
.repartition(<number-of-partition>)
.write
.mode(<write-mode>)
.avro(<file-path>)
The problem is when i run my spark job , I see at a time only one task is getting executed (so , only 1 data frame is getting written) . Also when I checked the number of active executors in the spark-ui , I see only 1 executor is being used.
Is it possible to write DataFrames in parallel in Spark? If yes am i doing it the good way?
CodePudding user response:
To run multiple parallel jobs, you need to submit them from separate threads:
Inside a given Spark application (SparkContext instance), multiple parallel jobs can run simultaneously if they were submitted from separate threads. By “job”, in this section, we mean a Spark action (e.g. save, collect) and any tasks that need to run to evaluate that action.
EDIT: I'm using parquet format but using avro format should be the same.
CodePudding user response:
The other two answers are correct. However, here's another approach using Scala Futures.
You can find an elaborate explanation here, about this approach: http://www.russellspitzer.com/2017/02/27/Concurrency-In-Spark/
You will see that the respective output directories are indeed outputted at around the same time, rather than sequentially.
import org.apache.spark.sql.DataFrame
import scala.concurrent.duration.Duration._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Await, Future}
def write(df:DataFrame, i:Int) =
df.repartition(5).write.mode("overwrite").avro(s"/path/to/output_$i")
val dataframes = Iterator(df, df2) // replace with a list of your data frames
// This is the line that "executes" the writes in parallel
// Use dataframes.zipWithIndex.map to create an iterator/list of futures
// Use Future.sequence to compose together all of these futures, into one future
// Use Await.result to wait until this "composite" future completes
Await.result(Future.sequence(Iterator(df, df2).zipWithIndex.map{ case (d, i) => Future(write(d, i))}), Inf)
You can set a timeout (other than Inf
), and also batch together sublists of dataframes, if needed, to limit the parallelism.