Home > Software engineering >  Where are the spark intermediate files stored on the disk?
Where are the spark intermediate files stored on the disk?

Time:01-06

During a shuffle, the mappers dump their outputs to the local disk from where it gets picked up by the reducers. Where exactly on the disk are those files dumped? I am running pyspark cluster on YARN.

What I have tried so far:

I think the possible locations where the intermediate files could be are (In the decreasing order of likelihood):

  1. hadoop/spark/tmp. As per the documentation at the LOCAL_DIRS env variable that gets defined by the yarn. However, post starting the cluster (I am passing master --yarn) I couldn't find any LOCAL_DIRS env variable using os.environ but, I can see SPARK_LOCAL_DIRS which should happen only in case of mesos or standalone as per the documentation (Any idea why that might be the case?). Anyhow, my SPARK_LOCAL_DIRS is hadoop/spark/tmp
  2. tmp. Default value of spark.local.dir
  3. /home/username. I have tried sending custom value to spark.local.dir while starting the pyspark using --conf spark.local.dir=/home/username
  4. hadoop/yarn/nm-local-dir. This is the value of yarn.nodemanager.local-dirs property in yarn-site.xml

I am running the following code and checking for any intermediate files being created at the above 4 locations by navigating to each location on a worker node.

The code I am running:

from pyspark import storagelevel
df_sales = spark.read.load("gs://monsoon-credittech.appspot.com/spark_datasets/sales_parquet")
df_products = spark.read.load("gs://monsoon-credittech.appspot.com/spark_datasets/products_parquet")
df_merged = df_sales.join(df_products,df_sales.product_id==df_products.product_id,'inner')
df_merged.persist(storagelevel.StorageLevel.DISK_ONLY)
df_merged.count()

There are no files that are being created at any of the 4 locations that I have listed above

Where are the spark intermediate files (output of mappers, persist etc) stored?

CodePudding user response:

This is going to depend on what your cluster setup is and your Spark version, but you're more or less looking at the correct places.

For this explanation, I'll be talking about Spark v3.3.1. which is the latest version as of the time of this post.

There is an interesting method in org.apache.spark.util.Utils called getConfiguredLocalDirs and it looks like this:

  /**
   * Return the configured local directories where Spark can write files. This
   * method does not create any directories on its own, it only encapsulates the
   * logic of locating the local directories according to deployment mode.
   */
  def getConfiguredLocalDirs(conf: SparkConf): Array[String] = {
    val shuffleServiceEnabled = conf.get(config.SHUFFLE_SERVICE_ENABLED)
    if (isRunningInYarnContainer(conf)) {
      // If we are in yarn mode, systems can have different disk layouts so we must set it
      // to what Yarn on this system said was available. Note this assumes that Yarn has
      // created the directories already, and that they are secured so that only the
      // user has access to them.
      randomizeInPlace(getYarnLocalDirs(conf).split(","))
    } else if (conf.getenv("SPARK_EXECUTOR_DIRS") != null) {
      conf.getenv("SPARK_EXECUTOR_DIRS").split(File.pathSeparator)
    } else if (conf.getenv("SPARK_LOCAL_DIRS") != null) {
      conf.getenv("SPARK_LOCAL_DIRS").split(",")
    } else if (conf.getenv("MESOS_SANDBOX") != null && !shuffleServiceEnabled) {
      // Mesos already creates a directory per Mesos task. Spark should use that directory
      // instead so all temporary files are automatically cleaned up when the Mesos task ends.
      // Note that we don't want this if the shuffle service is enabled because we want to
      // continue to serve shuffle files after the executors that wrote them have already exited.
      Array(conf.getenv("MESOS_SANDBOX"))
    } else {
      if (conf.getenv("MESOS_SANDBOX") != null && shuffleServiceEnabled) {
        logInfo("MESOS_SANDBOX available but not using provided Mesos sandbox because "  
          s"${config.SHUFFLE_SERVICE_ENABLED.key} is enabled.")
      }
      // In non-Yarn mode (or for the driver in yarn-client mode), we cannot trust the user
      // configuration to point to a secure directory. So create a subdirectory with restricted
      // permissions under each listed directory.
      conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")).split(",")
    }
  }

This is interesting, because it makes us understand the order of precedence each config setting has. The order is:

  • if running in Yarn, getYarnLocalDirs should give you your local dir, which depends on the LOCAL_DIRS environment variable
  • if SPARK_EXECUTOR_DIRS is set, it's going to be one of those
  • if SPARK_LOCAL_DIRS is set, it's going to be one of those
  • if MESOS_SANDBOX and !shuffleServiceEnabled, it's going to be MESOS_SANDBOX
  • if spark.local.dir is set, it's going to be that
  • ELSE (catch-all) it's going to be java.io.tmpdir

IMPORTANT: In case you're using Kubernetes, all of this is disregarded and this logic is used.

Now, how do we find this directory?

Luckily, there is a nicely placed logging line in DiskBlockManager.createLocalDirs which prints out this directory if your logging level is INFO.

So, set your default logging level to INFO in log4j.properties (like so), restart your spark application and you should be getting a line saying something like

Created local directory at YOUR-DIR-HERE

CodePudding user response:

Without getting into the weeds of Spark source, perhaps you can quickly check it live. Something like this:

>>> irdd = spark.sparkContext.range(0,100,1,10)                                                                                                          
>>> def wherearemydirs(p):
...   import os
...   print("Your local dirs are: "   os.getenv('LOCAL_DIRS'))                                                                                                 
... 
>>> 
>>> irdd.foreachPartition(wherearemydirs)
>>>

...will show local dirs in executor logs (stderr):

Your local dirs are: /data/1/yarn/nm/usercache/<your-user-id>/appcache/<application_xxxxxxxxxxx_xxxxxxx>,/data/10/yarn/nm/usercache/<your-user-id>/appcache/<application_xxxxxxxxxxx_xxxxxxx>,/data/11/yarn/nm/usercache/<your-user-id>/appcache/<application_xxxxxxxxxxx_xxxxxxx>,...

But yes, it will basically point to the parent dir (created by YARN) of UUID-randomized subdirs created by DiskBlockManager, as @KoedIt mentioned:

:
23/01/05 10:15:37 INFO storage.DiskBlockManager: Created local directory at /data/1/yarn/nm/usercache/<your-user-id>/appcache/application_xxxxxxxxx_xxxxxxx/blockmgr-d4df4512-d18b-4dcf-8197-4dfe781b526a
:
  • Related