During a shuffle, the mappers dump their outputs to the local disk from where it gets picked up by the reducers. Where exactly on the disk are those files dumped? I am running pyspark cluster on YARN.
What I have tried so far:
I think the possible locations where the intermediate files could be are (In the decreasing order of likelihood):
hadoop/spark/tmp
. As per the documentation at theLOCAL_DIRS
env variable that gets defined by the yarn. However, post starting the cluster (I am passingmaster --yarn
) I couldn't find anyLOCAL_DIRS
env variable usingos.environ
but, I can seeSPARK_LOCAL_DIRS
which should happen only in case of mesos or standalone as per the documentation (Any idea why that might be the case?). Anyhow, mySPARK_LOCAL_DIRS
ishadoop/spark/tmp
tmp
. Default value ofspark.local.dir
/home/username
. I have tried sending custom value tospark.local.dir
while starting the pyspark using--conf spark.local.dir=/home/username
hadoop/yarn/nm-local-dir
. This is the value ofyarn.nodemanager.local-dirs
property in yarn-site.xml
I am running the following code and checking for any intermediate files being created at the above 4 locations by navigating to each location on a worker node.
The code I am running:
from pyspark import storagelevel
df_sales = spark.read.load("gs://monsoon-credittech.appspot.com/spark_datasets/sales_parquet")
df_products = spark.read.load("gs://monsoon-credittech.appspot.com/spark_datasets/products_parquet")
df_merged = df_sales.join(df_products,df_sales.product_id==df_products.product_id,'inner')
df_merged.persist(storagelevel.StorageLevel.DISK_ONLY)
df_merged.count()
There are no files that are being created at any of the 4 locations that I have listed above
Where are the spark intermediate files (output of mappers, persist etc) stored?
CodePudding user response:
This is going to depend on what your cluster setup is and your Spark version, but you're more or less looking at the correct places.
For this explanation, I'll be talking about Spark v3.3.1. which is the latest version as of the time of this post.
There is an interesting method in org.apache.spark.util.Utils
called getConfiguredLocalDirs
and it looks like this:
/**
* Return the configured local directories where Spark can write files. This
* method does not create any directories on its own, it only encapsulates the
* logic of locating the local directories according to deployment mode.
*/
def getConfiguredLocalDirs(conf: SparkConf): Array[String] = {
val shuffleServiceEnabled = conf.get(config.SHUFFLE_SERVICE_ENABLED)
if (isRunningInYarnContainer(conf)) {
// If we are in yarn mode, systems can have different disk layouts so we must set it
// to what Yarn on this system said was available. Note this assumes that Yarn has
// created the directories already, and that they are secured so that only the
// user has access to them.
randomizeInPlace(getYarnLocalDirs(conf).split(","))
} else if (conf.getenv("SPARK_EXECUTOR_DIRS") != null) {
conf.getenv("SPARK_EXECUTOR_DIRS").split(File.pathSeparator)
} else if (conf.getenv("SPARK_LOCAL_DIRS") != null) {
conf.getenv("SPARK_LOCAL_DIRS").split(",")
} else if (conf.getenv("MESOS_SANDBOX") != null && !shuffleServiceEnabled) {
// Mesos already creates a directory per Mesos task. Spark should use that directory
// instead so all temporary files are automatically cleaned up when the Mesos task ends.
// Note that we don't want this if the shuffle service is enabled because we want to
// continue to serve shuffle files after the executors that wrote them have already exited.
Array(conf.getenv("MESOS_SANDBOX"))
} else {
if (conf.getenv("MESOS_SANDBOX") != null && shuffleServiceEnabled) {
logInfo("MESOS_SANDBOX available but not using provided Mesos sandbox because "
s"${config.SHUFFLE_SERVICE_ENABLED.key} is enabled.")
}
// In non-Yarn mode (or for the driver in yarn-client mode), we cannot trust the user
// configuration to point to a secure directory. So create a subdirectory with restricted
// permissions under each listed directory.
conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")).split(",")
}
}
This is interesting, because it makes us understand the order of precedence each config setting has. The order is:
- if running in Yarn,
getYarnLocalDirs
should give you your local dir, which depends on theLOCAL_DIRS
environment variable - if
SPARK_EXECUTOR_DIRS
is set, it's going to be one of those - if
SPARK_LOCAL_DIRS
is set, it's going to be one of those - if
MESOS_SANDBOX
and!shuffleServiceEnabled
, it's going to beMESOS_SANDBOX
- if
spark.local.dir
is set, it's going to be that - ELSE (catch-all) it's going to be
java.io.tmpdir
IMPORTANT: In case you're using Kubernetes, all of this is disregarded and this logic is used.
Now, how do we find this directory?
Luckily, there is a nicely placed logging line in DiskBlockManager.createLocalDirs
which prints out this directory if your logging level is INFO.
So, set your default logging level to INFO in log4j.properties (like so), restart your spark application and you should be getting a line saying something like
Created local directory at YOUR-DIR-HERE
CodePudding user response:
Without getting into the weeds of Spark source, perhaps you can quickly check it live. Something like this:
>>> irdd = spark.sparkContext.range(0,100,1,10)
>>> def wherearemydirs(p):
... import os
... print("Your local dirs are: " os.getenv('LOCAL_DIRS'))
...
>>>
>>> irdd.foreachPartition(wherearemydirs)
>>>
...will show local dirs in executor logs (stderr):
Your local dirs are: /data/1/yarn/nm/usercache/<your-user-id>/appcache/<application_xxxxxxxxxxx_xxxxxxx>,/data/10/yarn/nm/usercache/<your-user-id>/appcache/<application_xxxxxxxxxxx_xxxxxxx>,/data/11/yarn/nm/usercache/<your-user-id>/appcache/<application_xxxxxxxxxxx_xxxxxxx>,...
But yes, it will basically point to the parent dir (created by YARN) of UUID-randomized subdirs created by DiskBlockManager
, as @KoedIt mentioned:
:
23/01/05 10:15:37 INFO storage.DiskBlockManager: Created local directory at /data/1/yarn/nm/usercache/<your-user-id>/appcache/application_xxxxxxxxx_xxxxxxx/blockmgr-d4df4512-d18b-4dcf-8197-4dfe781b526a
: