When spark.memory.offheap.enabled=true
, Spark can make use of off-heap memory for shuffles and caching (StorageLevel.OFF_HEAP
). Can off-heap memory be used to store broadcast variables? How?
CodePudding user response:
In short, no, you cannot use StorageLevel.OFF_HEAP
for broadcast variables.
To understand why, let's look at the source code for the SparkContext.broadcast(...)
method.
/**
* Broadcast a read-only variable to the cluster, returning a
* [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions ...
*/
def broadcast[T: ClassTag](value: T): Broadcast[T] = {
:
val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
:
bc
}
In the above code, the broadcastManager.newBroadcast(...)
is what creates the Broadcast
object, which is the return type of this method.
Now, let's dig deeper and inspect newBroadcast().
def newBroadcast(value_ : T, isLocal: Boolean): Broadcast[T] = {
broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())
}
In the above code, broadcastManager
has a component called broadcastFactory
and is using the abstract factory design pattern to delegate the creation of the broadcast variable to its relevant factory.
Also note that the BroadcastManager
keeps track of a unique id
for each broadcast
variable, which is incremented for each new broadcast variable.
Currently, there is only one kind of BroadcastFactory
that can be initialized in spark, which is the TorrentBroadcastFactory
. This is seen in the initialization code of the BroadcastManager
.
// Called by SparkContext or Executor before using Broadcast
private def initialize() {
:
broadcastFactory = new TorrentBroadcastFactory
:
}
Quoting the source code of the TorrentBroadcastFactory
Broadcast implementation that uses a BitTorrent-like protocol to do a distributed transfer of the broadcasted data to the executors
This particular factory uses TorrentBroadcast. The description of this class is very informative.
The driver divides the serialized object into small chunks and stores those chunks in the BlockManager of the driver.
On each executor, the executor first attempts to fetch the object from its BlockManager. If it does not exist, it then uses remote fetches to fetch the small chunks from the driver and/or other executors if available. Once it gets the chunks, it puts the chunks in its own BlockManager, ready for other executors to fetch from. This prevents the driver from being the bottleneck in sending out multiple copies of the broadcast data (one per executor).
Reading the writeBlock function of TorrentBroadcast
class, we can see the hard-coded StorageLevel.MEMORY_AND_DISK_SER
options for this broadcast.
/**
* Divide the object into multiple blocks and put those blocks in the block manager.
*
* @param value the object to divide
* @return number of blocks this broadcast variable is divided into
*/
private def writeBlocks(value: T): Int = {
import StorageLevel._
:
:
if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) {
throw new SparkException(s"Failed to store $pieceId of $broadcastId " s"in local BlockManager")
}
:
:
Hence, as this code uses hard-coded values of StorageLevel.MEMORY_AND_DISK_SER
, we cannot use StorageLevel.OFF_HEAP
for broadcast variables.