I know there's a way to configure a Spark Application based in your cluster resources ("Executor memory" and "number of Executor" and "executor cores") I'm wondering if exist a way to do it considering the data input size?
What would happen if data input size does not fit into all partitions?
Example:
Data input size = 200GB
Number of partitions in cluster = 100
Size of partitions = 128MB
Total size that partitions could handle = 100 * 128MB = 128GB
What about the rest of the data (72GB)?
I guess Spark will wait to have free the resources free due to is designed to process batches of data Is this a correct assumption?
Thank in advance
CodePudding user response:
I recommend for best performance, don't set spark.executor.cores
. You want one executor per worker. Also, use ~70% of the executor memory in spark.executor.memory
. Finally- if you want real-time application statistics to influence the number of partitions, use Spark 3, since it will come with Adaptive Query Execution (AQE). With AQE, Spark will dynamically coalesce shuffle partitions. SO you set it to an arbitrarily-large number of partitions, such as:
spark.sql.shuffle.partitions=<number of cores * 50>
Then just let AQE do its thing. You can read more about it here: https://www.databricks.com/blog/2020/05/29/adaptive-query-execution-speeding-up-spark-sql-at-runtime.html
CodePudding user response:
There are 2 aspects to your question. The first is regarding storage of this data, & the second is regarding data execution.
With regards to storage, when you say Size of partitions = 128MB
, I assume you use HDFS to store this data & 128M is your default block size. HDFS itself internally decides how to split this 200GB file & store in chunks not exceeding 128M. And your HDFS cluster should have more than 200GB * replication factor
of combined storage to persist this data.
Coming to the Spark execution part of the question, once you define spark.default.parallelism=100
, it means that Spark will use this value as the default level of parallelism while performing certain operations (like join etc). Please note that the amount of data being processed by each executor is not affected by the block size (128M) in any way. Which means each executor task will work on 200G/100 = 2G of data (provided the executor memory is sufficient for the required operation being performed). In case there isn't enough capacity in the spark cluster to run 100 executors in parallel, then it will launch as many executors it can in batches as and when resources are available.