I have millions of Gzipped CSV files to process and converting to Parquet. I'm running a simple Spark batch job on EMR to do the conversion, and giving it a couple million files at a time to convert.
However, I've noticed that there is a big delay from when the job starts to when the files are listed and split up into a batch for the executors to do the conversion. From what I have read and understood, the scheduler has to get the metadata for those files, and schedule those tasks. However, I've noticed that this step is taking ~17 minutes for a million files to split up into tasks for a batch. Even though the actual task of listing the files and doing the conversion only takes 15 minutes with my cluster of instances, the overall job takes over 30 minutes. It appears that it takes a lot of time for the driver to index all the files to split up into tasks. Is there any way to increase parallelism for this initial stage of indexing files and splitting up tasks for a batch?
I've tried tinkering with and increasing spark.driver.cores
thinking that it would increase parallelism, but it doesn't seem to have an effect.
CodePudding user response:
you can try by setting below config
spark.conf.set("spark.default.parallelism",x)
where x = total_nodes_in_cluster * (total_core_in_node - 1 ) * 5
CodePudding user response:
to support Rishav's answer with some explanation:
spark.conf.set("spark.default.parallelism", x)
This sets the default level of parallelism in Apache Spark. It defines the number of parallel tasks that can run simultaneously in a Spark application.
Here, spark.conf
is an instance of the SparkConf class and set is a method used to set the configuration property. The first argument to the set method, "spark.default.parallelism", is the property name, and the second argument, x, is its value.
Setting the default parallelism to a value of x means that Spark will use x as the default number of parallel tasks for operations that can be executed in parallel, such as data shuffling or aggregations. The default parallelism is used as the number of partitions for RDDs and DataFrames.