I am trying to write a simple spark UDF like this. When I test it in databricks notebook on a spark version 10.4.x-scala2.12
. The same code works just fine. When I run this in a packaged jar and submit to databricks on same spark version it results in an exception like this,
Exception: at spark.sql(sql_stat).show(false)
Job aborted due to stage failure.
Caused by: NoClassDefFoundError: Could not initialize class com.test.TestClass$
:
:
at com.test.TestClass$.$anonfun$main$5(TestClass.scala:13)
Code:
object Test{
def main(args: Array[String]): Unit = {
val udf_lambda =(id: Int) => {
if (id%2==0)
"group A"
else
"group B"
}
spark.udf.register("udf_lambda", udf_lambda)
val sql_stat = "select udf_lambda(id) as idv2 from hive_table"
spark.sql(sql_stat).show(false)
}
}
Any ideas on why this might be or how to troubleshoot it? I have the jar working just fine when i change the query to one without UDF. Simple query like select id as idv2 from hive_table
just displays data form table. It
CodePudding user response:
Method udf_lambda
needs to be in a place where it can be serialized. The easiest way to do so is by placing it directly in an object.
object Test{
val udf_lambda =(id: Int) => {
if (id%2==0)
"group A"
else
"group B"
}
def main(args: Array[String]): Unit = {
spark.udf.register("udf_lambda", udf_lambda)
val sql_stat = "select udf_lambda(id) as idv2 from hive_table"
spark.sql(sql_stat).show(false)
}
}