Home > Back-end >  How to configure a custom Spark Plugin in Databricks?
How to configure a custom Spark Plugin in Databricks?

Time:11-09

How to properly configure Spark plugin and the jar containing the Spark Plugin class in Databricks?

I created the following Spark 3 Plugin class in Scala, CustomExecSparkPlugin.scala:

package example

import org.apache.spark.api.plugin.{SparkPlugin, DriverPlugin, ExecutorPlugin}

class CustomExecSparkPlugin extends SparkPlugin  {
 
  override def driverPlugin(): DriverPlugin = {
    new DriverPlugin() {
      override def shutdown(): Unit = {
        // custom code        
      }
    }
  }

  override def executorPlugin(): ExecutorPlugin = {
    new ExecutorPlugin() {
      override def shutdown(): Unit = {
        // custom code  
      }
    }
  }
}

I have packaged it into a jar and uploaded it to DBFS and during DBR 7.3 (Spark 3.0.1, Scala 2.12) cluster creation, I set the following Spark Configs (Advanced Options):

spark.plugins com.example.CustomExecSparkPlugin
spark.driver.extraClassPath /dbfs/path/to/jar
spark.executor.extraClassPath /dbfs/path/to/jar

However, the cluster creation fails with Exception: com.example.CustomExecSparkPlugin not found in com.databricks.backend.daemon.driver.ClassLoaders$LibraryClassLoader@622d7e4

Driver log4j logs:

21/11/01 13:33:01 ERROR SparkContext: Error initializing SparkContext.
java.lang.ClassNotFoundException: com.example.CustomExecSparkPlugin not found in com.databricks.backend.daemon.driver.ClassLoaders$LibraryClassLoader@622d7e4
    at com.databricks.backend.daemon.driver.ClassLoaders$MultiReplClassLoader.loadClass(ClassLoaders.scala:115)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at org.apache.spark.util.Utils$.classForName(Utils.scala:226)
    at org.apache.spark.util.Utils$.$anonfun$loadExtensions$1(Utils.scala:3006)
    at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
    at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:108)
    at org.apache.spark.util.Utils$.loadExtensions(Utils.scala:3004)
    at org.apache.spark.internal.plugin.PluginContainer$.apply(PluginContainer.scala:160)
    at org.apache.spark.internal.plugin.PluginContainer$.apply(PluginContainer.scala:146)
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:591)
    at com.databricks.backend.daemon.driver.DatabricksILoop$.$anonfun$initializeSharedDriverContext$1(DatabricksILoop.scala:347)
    at com.databricks.backend.daemon.driver.ClassLoaders$.withContextClassLoader(ClassLoaders.scala:29)
    at com.databricks.backend.daemon.driver.DatabricksILoop$.initializeSharedDriverContext(DatabricksILoop.scala:347)
    at com.databricks.backend.daemon.driver.DatabricksILoop$.getOrCreateSharedDriverContext(DatabricksILoop.scala:277)
    at com.databricks.backend.daemon.driver.DriverCorral.com$databricks$backend$daemon$driver$DriverCorral$$driverContext(DriverCorral.scala:179)
    at com.databricks.backend.daemon.driver.DriverCorral.<init>(DriverCorral.scala:216)
    at com.databricks.backend.daemon.driver.DriverDaemon.<init>(DriverDaemon.scala:39)
    at com.databricks.backend.daemon.driver.DriverDaemon$.create(DriverDaemon.scala:211)
    at com.databricks.backend.daemon.driver.DriverDaemon$.wrappedMain(DriverDaemon.scala:216)
    at com.databricks.DatabricksMain.$anonfun$main$1(DatabricksMain.scala:106)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.DatabricksMain.$anonfun$withStartupProfilingData$1(DatabricksMain.scala:321)
    at com.databricks.logging.UsageLogging.$anonfun$recordOperation$4(UsageLogging.scala:431)
    at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:239)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
    at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:234)
    at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:231)
    at com.databricks.DatabricksMain.withAttributionContext(DatabricksMain.scala:74)
    at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:276)
    at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:269)
    at com.databricks.DatabricksMain.withAttributionTags(DatabricksMain.scala:74)
    at com.databricks.logging.UsageLogging.recordOperation(UsageLogging.scala:412)
    at com.databricks.logging.UsageLogging.recordOperation$(UsageLogging.scala:338)
    at com.databricks.DatabricksMain.recordOperation(DatabricksMain.scala:74)
    at com.databricks.DatabricksMain.withStartupProfilingData(DatabricksMain.scala:321)
    at com.databricks.DatabricksMain.main(DatabricksMain.scala:105)
    at com.databricks.backend.daemon.driver.DriverDaemon.main(DriverDaemon.scala)
Caused by: java.lang.ClassNotFoundException: com.example.CustomExecSparkPlugin
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
    at com.databricks.backend.daemon.driver.ClassLoaders$LibraryClassLoader.loadClass(ClassLoaders.scala:151)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
    at com.databricks.backend.daemon.driver.ClassLoaders$MultiReplClassLoader.loadClass(ClassLoaders.scala:112)
    ... 43 more
21/11/01 13:33:02 INFO AbstractConnector: Stopped Spark@b6bccb4{HTTP/1.1,[http/1.1]}{10.88.234.70:40001}
21/11/01 13:33:02 INFO SparkUI: Stopped Spark web UI at http://10.88.234.70:40001
21/11/01 13:33:02 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
21/11/01 13:33:02 INFO MemoryStore: MemoryStore cleared
21/11/01 13:33:02 INFO BlockManager: BlockManager stopped
21/11/01 13:33:02 INFO BlockManagerMaster: BlockManagerMaster stopped
21/11/01 13:33:02 WARN MetricsSystem: Stopping a MetricsSystem that is not running
21/11/01 13:33:02 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
21/11/01 13:33:02 INFO SparkContext: Successfully stopped SparkContext

CodePudding user response:

You might consider adding this as an init script instead. The init scripts give you an opportunity to add jars to the cluster before spark even begins which is probably what the spark plugin is expecting.

  • Upload your jar to dbfs, somewhere like dbfs:/databricks/plugins
  • Create and upload a bash script like below to the same place.
  • Create / Edit a cluster with the init script specified.
#!/bin/bash

STAGE_DIR="/dbfs/databricks/plugins/

echo "BEGIN: Upload Spark Plugins"
cp -f $STAGE_DIR/*.jar /mnt/driver-daemon/jars || { echo "Error copying Spark Plugin library file"; exit 1;}
echo "END: Upload Spark Plugin JARs"

echo "BEGIN: Modify Spark config settings"
cat << 'EOF' > /databricks/driver/conf/spark-plugin-driver-defaults.conf
[driver] {
   "spark.plugins" = "com.example.CustomExecSparkPlugin"
}
EOF
echo "END: Modify Spark config settings"

I believe the copying of the jar to /mnt/driver-daemons/jars will make Spark aware of the jar before Spark fully initializes (doc). I'm less certain it will make to the executor though :(

  • Related