Home > database >  When using a broadcasted spark variable in a udf, how do I ensure the variable is not used before it
When using a broadcasted spark variable in a udf, how do I ensure the variable is not used before it

Time:09-09

I have a table of data that I want to reference in a UDF. My UDF and broadcast variable both belong to a serializable helper object, and I initialize the broadcast variable at the top of the class and call the UDF in a def of that class. During evaluation however I get a null pointer exception when trying to access the broadcast variable. Clearly the order of operations here is not happening as expected (the UDF gets executed before the relevant data for the broadcast variable is loaded), so I assume I need some way of enforcing some sort of dependency/order.

For the record I'm not attached to doing it this particular way with the object and class separation, it was just the best way I could think to get around the issue of serializing my UDF and making sure the UDF had access to the broadcast variable (which relied on data only available in the instance of my class).

class MyClass() {
    Helper.MyBroadcastVariable = spark.sparkContext.broadcast(convertToHashMap(super.referenceTable))

    def doThing(dataFrame: DataFrame): DataFrame{
        return dataFrame.withColumn("newColumn", Helper.MyUDF(col("inputColumn")))
    }
}

object Helper extends Serializable {
    var MyBroadcastVariable: Broadcast[Map[String, scala.Seq[String]]] = null

    def MyFunc(key: String): String = {
        println(MyBroadcastVariable.value(key))
    {

    val MyUDF: UserDefinedFunction = udf(MyFunc _)

}

CodePudding user response:

Don't use vars in Scala and even more in Spark, and even more with Broadcast!

In your case I would write it like this:

class MyClass() {
    val MyBroadcastVariable = spark.sparkContext.broadcast(convertToHashMap(super.referenceTable))

    def doThing(dataFrame: DataFrame): DataFrame{
        return dataFrame.withColumn("newColumn", Helper.MyUDF(col("inputColumn")))
    }
}

object Helper {

    def MyFunc(MyBroadcastVariable: Broadcast[Map[String, scala.Seq[String]]])(key...): String = {
        println(MyBroadcastVariable.value(key))
        ...
    }

    def MyUDF(MyBroadcastVariable: Broadcast[Map[String, scala.Seq[String]]]): UserDefinedFunction = udf(MyFunc(MyBroadcastVariable) _)

}
  • Related