How many partitions will pyspark-sql create while reading a .csv?
My understanding of this is that
number of partitions = math.ceil(file_size/spark.conf.get('spark.sql.files.maxPartitionBytes'))
On my machine:
spark.conf.get('spark.sql.files.maxPartitionBytes')
output:
'134217728b' #128MBs
However, I am not observing this behaviour. I create a file that is taking up 96 MB on disk. I run spark in local mode. I have an 8 core laptop. I think it should have read in 1 partition. However, the file is getting read in 8 partitions. Below is the codebase I have used:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
#creating a small DataFrame. This will occupy 96 MBs on disk
pd.DataFrame({'id':np.arange(10000000),'b':np.random.choice(['a','b','c','d'],size=(10000000,),p=[0.25,0.25,0.25,0.25])}).to_csv('df_s.csv',index=None)
sd=spark.read.csv('df_s.csv',schema="id int, b string")
sd.rdd.getNumPartitions()
output: 8
Can you please help me understand why am I seeing 8 partitions irrespective of the file size?
CodePudding user response:
The actual formula is actually a bit more complicated than that. Check the below calculation. You can find the source code here.
This is your configuration and your file
Spark Configuration | Value | Default |
---|---|---|
spark.sql.files.maxPartitionBytes | 128M | 128M |
spark.sql.files.openCostInBytes | 4M | 4M |
spark.executor.instances | 1 | local |
spark.executor.cores | 8 | your cores |
spark.default.parallelism | 8 | =spark.executor.instances * spark.executor.cores |
data files size | 64M | |
data files count | 1 |
This is the actual formula
Formula | Bytes | |
---|---|---|
DefaultMaxSplitBytes | = spark.sql.files.maxPartitionBytes | 134,217,728 |
OpenCostInBytes | = spark.sql.files.openCostInBytes | 4,194,304 |
DefaultParallelism | = spark.default.parallelism | 8 |
TotalBytes | = DataBytes (# files * OpenCostInBytes) | 71,303,168 |
BytesPerCore | = TotalBytes / DefaultParallelism | 8,912,896 |
MaxSplitBytes | = MIN(DefaultMaxSplitBytes, MAX(OpenCostInBytes, BytesPerCore)) | 8,912,896 |
Estimated number of partition | = TotalBytes / MaxSplitBytes | 8 |