Home > front end >  Spark SQL JDBC numberOfPartitions calculation from huge vs small data load
Spark SQL JDBC numberOfPartitions calculation from huge vs small data load

Time:12-04

I have a use case where depending on the argument passed in I may have to fetch and process 1) millions or records from database (read rdbms using jdbc, decode, convert to xml, convert to csv etc., a very time consuming process), and or 2) process only a few hundres or even handful of records. Please note that I do not know the volume of the data in this multi-tenant spark app until during the runtime of my app where I calculate total # of records I need to process. So I have two questions here:

  1. How do I know how many executors or cores I need to request for this spark job without knowing the data volume as I kick off the run.
  2. Because I making jdbc calls on a DB table, I am using the numOfPartitions, lowerbound(0), upperBound(total#OfRecords), and partitioncolumn(ROW_NUM) to parition the SparkSQL. Now how do I calculate the numOfPartitions? In one case when I am fetching millions I want to have more paritions and less for a handful. How do I decide this number? What is the logic? Would this numOFPartition be 10-20 for 100-200? We dont want to affect transaction applications by hoggin DB resources. How do people typically decide on ths numOfPartitions? Appreicate your help thank you

Need help deciding database numOfPartitions

CodePudding user response:

It can be challenging to determine the optimal number of executors and cores for a Spark job without knowing the volume of data that needs to be processed. In general, you will want to use as many executors and cores as possible to maximize the parallelism of the job and reduce the overall processing time.

However, it's important to consider the following factors when determining the number of executors and cores to use:

The size and complexity of the data: If the data is large and complex, you may need more executors and cores to process it effectively. The available resources: The number of executors and cores you can use will depend on the resources available on the cluster. If the cluster is already heavily utilized, you may need to use fewer executors and cores to avoid overloading the system. The overall performance of the job: You can use Spark's built-in performance metrics to monitor the performance of the job and adjust the number of executors and cores as needed to optimize the processing time. One approach you could take is to start with a small number of executors and cores and gradually increase them as needed based on the performance of the job and the available resources. You can also use Spark's dynamic allocation feature to automatically adjust the number of executors and cores based on the workload and available resources. This can help ensure that your Spark job is able to effectively process the data without overloading the system.

Spark's dynamic allocation feature allows the Spark application to automatically request additional executors or release unused executors based on the workload and available resources in the cluster. This can help improve the overall performance and efficiency of the Spark application by ensuring that the right amount of resources are available to process the data.

Dynamic allocation is enabled by default in Spark, but it can be configured using the spark.dynamicAllocation.enabled property in the Spark configuration.

You can also adjust the default behavior of dynamic allocation using the following properties:

spark.dynamicAllocation.minExecutors: The minimum number of executors to use for the application. spark.dynamicAllocation.maxExecutors: The maximum number of executors to use for the application. spark.dynamicAllocation.initialExecutors: The initial number of executors to use for the application.

By default, dynamic allocation will try to maintain a constant number of executors based on the workload and available resources. However, you can also configure it to scale up or down based on the workload using the spark.dynamicAllocation.scalingUpFactor and spark.dynamicAllocation.scalingDownFactor properties.

Overall, using Spark's dynamic allocation feature can help improve the performance and efficiency of your Spark application by automatically allocating the right amount of resources for the data being processed.

CodePudding user response:

The number of partitions to use when reading data from a database using the JDBC connector can have a significant impact on the performance and efficiency of the Spark job. In general, a larger number of partitions will allow the data to be processed in parallel across multiple nodes in the cluster, which can improve the overall processing time. However, using too many partitions can also cause performance issues, such as overwhelming the database with too many concurrent connections.

When you use the numPartitions parameter in a JDBC query in Spark, it will create one database connection for each partition, which can potentially overwhelm the source database if the number of partitions is too large. To avoid this issue, it's important to carefully consider the number of partitions you use in your query.

One approach you could take is to use a smaller number of partitions, such as 10-20 partitions, and ensure that each partition processes a reasonable amount of data. For example, you could use the partitionColumn and lowerBound and upperBound parameters to specify a range of values for the partition column, and then set the numPartitions parameter to a value that will create partitions of approximately 128 MB in size. This can help ensure that the number of database connections used by the query is manageable and will not overwhelm the source database.

After the query, you can repartition the DataFrame with "repartition" such as:

val repartitionedDF = df.repartition(idealNumPartitions)

To calculate the optimal number of partitions for repartition we need to evaluate the size of the DataFrame, it can be done with:

 val sizeInBytes =
      usersDFDistinct.queryExecution.optimizedPlan.stats.sizeInBytes

Then we can calculate the optimal number of partitions with:

 val sizeInMB: Double = (sizeInBytes).toDouble / 1024.0 / 1024.0
    println(f"           
  • Related