Home > Enterprise >  Spark - using off-heap memory
Spark - using off-heap memory

Time:10-17

When spark.memory.offheap.enabled=true, Spark can make use of off-heap memory for shuffles and caching (StorageLevel.OFF_HEAP). Can off-heap memory be used to store broadcast variables? How?

CodePudding user response:

In short, no, you cannot use StorageLevel.OFF_HEAP for broadcast variables.

To understand why, let's look at the source code for the SparkContext.broadcast(...) method.

/**
  * Broadcast a read-only variable to the cluster, returning a
  * [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions ... 
  */
 def broadcast[T: ClassTag](value: T): Broadcast[T] = {
   :
   val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
   :
   bc
 }

In the above code, the broadcastManager.newBroadcast(...) is what creates the Broadcast object, which is the return type of this method.

Now, let's dig deeper and inspect newBroadcast().

def newBroadcast(value_ : T, isLocal: Boolean): Broadcast[T] = {
  broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())
}

In the above code, broadcastManager has a component called broadcastFactory and is using the abstract factory design pattern to delegate the creation of the broadcast variable to its relevant factory.

Also note that the BroadcastManager keeps track of a unique id for each broadcast variable, which is incremented for each new broadcast variable.

Currently, there is only one kind of BroadcastFactory that can be initialized in spark, which is the TorrentBroadcastFactory. This is seen in the initialization code of the BroadcastManager.

// Called by SparkContext or Executor before using Broadcast
private def initialize() {
  :
  broadcastFactory = new TorrentBroadcastFactory
  :
}

Quoting the source code of the TorrentBroadcastFactory

Broadcast implementation that uses a BitTorrent-like protocol to do a distributed transfer of the broadcasted data to the executors

This particular factory uses TorrentBroadcast. The description of this class is very informative.

The driver divides the serialized object into small chunks and stores those chunks in the BlockManager of the driver.

On each executor, the executor first attempts to fetch the object from its BlockManager. If it does not exist, it then uses remote fetches to fetch the small chunks from the driver and/or other executors if available. Once it gets the chunks, it puts the chunks in its own BlockManager, ready for other executors to fetch from. This prevents the driver from being the bottleneck in sending out multiple copies of the broadcast data (one per executor).

Reading the writeBlock function of TorrentBroadcast class, we can see the hard-coded StorageLevel.MEMORY_AND_DISK_SER options for this broadcast.

  /**
   * Divide the object into multiple blocks and put those blocks in the block manager.
   *
   * @param value the object to divide
   * @return number of blocks this broadcast variable is divided into
   */
  private def writeBlocks(value: T): Int = {
    import StorageLevel._
    :
    :
    if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) {
      throw new SparkException(s"Failed to store $pieceId of $broadcastId "   s"in local BlockManager")
    }
    :
    :

Hence, as this code uses hard-coded values of StorageLevel.MEMORY_AND_DISK_SER, we cannot use StorageLevel.OFF_HEAP for broadcast variables.

  • Related