Home > other >  Return Row with schema defined at runtime in Spark UDF
Return Row with schema defined at runtime in Spark UDF

Time:02-05

I've dulled my sword on this one, some help would be greatly appreciated!

Background

I am building an ETL pipeline that takes GNMI Protobuf update messages off of a Kafka queue and eventually breaks them out into a bunch of delta tables based on the prefix and parameters of the paths to values (e.g. DataBricks runtime).

Without going into the gory details, each prefix corresponds roughly to a schema for a table, with the caveat that the paths can change (usually new subtrees) upstream, so the schema is not fixed. This is similar to a nested JSON structure .

I first break out the updates by prefix, so all of the updates have roughly the same schema. I defined some transformations so that when the schema does not match exactly, I can coerce them into a common schema.

I'm running into trouble when I try to create a struct column with the common schema.

Attempt 1

I first tried just returning an Array[Any] from my udf, and providing a schema in the UDF definition (I know this is deprecated):

import org.apache.spark.sql.{functions => F, Row, types => T}

  def mapToRow(deserialized: Map[String, ParsedValueV2]): Array[Any] = {
    def getValue(key: String): Any = {
        deserialized.get(key) match {
            case Some(value) => value.asType(columns(key))
            case None => None
        }
    }
    
    columns.keys.toArray.map(getValue).toArray
  }
  
  spark.conf.set("spark.sql.legacy.allowUntypedScalaUDF", "true")
  def mapToStructUdf = F.udf(mapToRow _, account.sparkSchemas(prefix))

This snippet creates an Array object with the typed values that I need. Unfortunately when I try to use the UDF, I get this error:

java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to $line8760b7c10da04d2489451bb90ca42c6535.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$ParsedValueV2

I'm not sure what's not matching, but I did notice that the type of the values are Java types, not scala, so perhaps that is related?

Attempt 2

Maybe I can use the Typed UDF interface after all? Can I create a case class at runtime for each schema, and then use that as the return value from my udf?

I've tried to get this to work using various stuff I found like this:

import scala.reflect.runtime.universe
import scala.tools.reflect.ToolBox
val tb = universe.runtimeMirror(getClass.getClassLoader).mkToolBox()
val test = tb.eval(tb.parse("object Test; Test"))

but I can't even get an instance of test, and can't figure out how to use it as the return value of a UDF. I presume I need to use a generic type somehow, but my scala-fu is too weak to figure this one out.

Finally, the question

Can some help me figure out which approach to take, and how to proceed with that approach?

Thanks in advance for your help!!!

Update - is this a Spark bug?

I've distilled the problem down to this code:

import org.apache.spark.sql.{functions => F, Row, types => T}

// thanks @Dmytro Mitin
val spark = SparkSession.builder
  .master ("local")
  .appName ("Spark app")
  .getOrCreate ()

spark.conf.set("spark.sql.legacy.allowUntypedScalaUDF", "true")

def simpleFn(foo: Any): Seq[Any] = List("hello world", "Another String", 42L)
// def simpleFn(foo: Any): Seq[Any] = List("hello world", "Another String")

def simpleUdf = F.udf(
  simpleFn(_),
  dataType = T.StructType(
    List(
      T.StructField("a_string", T.StringType),
      T.StructField("another_string", T.StringType),
      T.StructField("an_int", T.IntegerType),
    )
  )
)

Seq(("bar", "foo"))
  .toDF("column", "input")
  .withColumn(
    "array_data",
    simpleUdf($"input")
  )
  .show(truncate=false)

which results in this error message

IllegalArgumentException: The value (List(Another String, 42)) of the type (scala.collection.immutable.$colon$colon) cannot be converted to the string type

Hmm... that's odd. Where does that list come from, missing the first element of the row?

Two valued version (e.g. "hello world", "Another String") has the same problem, but if I only have one value in my struct, then its happy:

// def simpleFn(foo: Any): Seq[Any] = List("hello world", "Another String")
def simpleFn(foo: Any): Seq[Any] = List("hello world")

def simpleUdf = F.udf(
  simpleFn(_),
  dataType = T.StructType(
    List(
      T.StructField("a_string", T.StringType),
      // T.StructField("another_string", T.StringType),
      // T.StructField("an_int", T.IntegerType),
    )
  )
)

and my query gives me

 ------ ----- ------------- 
|column|input|array_data   |
 ------ ----- ------------- 
|bar   |foo  |{hello world}|
 ------ ----- ------------- 

It looks like its giving me the first element of my Sequence as the first field of the struct, the rest of it as the second element of the struct, and then the third one is null (seen in other cases), and causes an exception.

This looks like a bug to me. Anyone else have any experience with UDFs with schemas built on the fly like this?

Spark 3.3.1, scala 2.12, DBR 12.0

Reflection struggles

A stupid way to accomplish what I want to do would be to take the schema's I've inferred, generate a bunch of scala code that implements case classes that I can use as return types from my UDFs, then compile the code, package up a JAR, load it into my databricks runtime, and then use the case classes as return results from the UDFs.

This seems like a very convoluted way to do things. It would be great if I could just generate the case classes, and then do something like

  def myUdf[CaseClass](input: SomeInputType): CaseClass = 
    CaseClass(input.giveMeResults: _*) 

The problem is that I can't figure out how to get the type I've created using eval into the current "context" (I don't know the right word here).

This code:

import scala.reflect.runtime.universe
import scala.tools.reflect.ToolBox
val tb = universe.runtimeMirror(getClass.getClassLoader).mkToolBox()
val test = tb.eval(tb.parse("object Test; Test"))

give me this:

...
test: Any = __wrapper$1$bb89c0cde37c48929fa9d8cdabeeb0f8.__wrapper$1$bb89c0cde37c48929fa9d8cdabeeb0f8$Test$1$@492531c0

test is, I think, an instance of Test, but the type system in the REPL doesn't know about any type named Test, so I can't use test.asInstanceOf[Test] or something like that

I know this is a frequently asked question, but I can't seem to find an answer anywhere about how to actually accomplish what I described above.

CodePudding user response:

Regarding "Reflection struggles". It's not clear for me whether: 1) you already have def myUdf[T] = ... from somewhere and you're trying just to call it for generated case class: myUdf[GeneratedClass] or 2) you're trying to define def myUdf[T] = ... based on the generated class.

  1. In the former case you should use:
  • tb.define to generate an object (or case class), it returns a class symbol (or module symbol), you can use it further (e.g. in a type position)

  • tb.eval to call the method (myUdf)

object Main extends App {
  def myUdf[T](): Unit = println("myUdf")

  import scala.reflect.runtime.universe
  import universe.Quasiquote
  import scala.tools.reflect.ToolBox

  val tb = universe.runtimeMirror(getClass.getClassLoader).mkToolBox()
  val testSymbol = tb.define(q"object Test")
  val test = tb.eval(q"$testSymbol")

  tb.eval(q"Main.myUdf[$testSymbol]()") // myUdf
}

In this example I changed the signature (and body) of myUdf, you should use your actual ones.

  1. In the latter case you can define myUdf at runtime too:
object Main extends App {
  import scala.reflect.runtime.universe
  import universe.Quasiquote
  import scala.tools.reflect.ToolBox

  val tb = universe.runtimeMirror(getClass.getClassLoader).mkToolBox()
  val testSymbol = tb.define(q"object Test")
  val test = tb.eval(q"$testSymbol")

  val xSymbol = tb.define(
    q"""
      object X {
        def myUdf[T](): Unit = println("myUdf")
      }
    """
  )

  tb.eval(q"$xSymbol.myUdf[$testSymbol]()") //myUdf
}

You should try to write myUdf for ordinary case and we'll translate it for runtime-generated one.

so I can't use test.asInstanceOf[Test] or something like that

Yeah, type Test doesn't exist at compile time so you can't use it like that. It exists at runtime so you should use it inside quasiquotes q"..." (or tb.parse("..."))

object Main extends App {
  import scala.reflect.runtime.universe
  import universe.Quasiquote
  import scala.tools.reflect.ToolBox

  val tb = universe.runtimeMirror(getClass.getClassLoader).mkToolBox()
  val testSymbol = tb.define(q"object Test")
  val test = tb.eval(q"$testSymbol")

  tb.eval(q"Main.test.asInstanceOf[${testSymbol.asModule.moduleClass.asClass.toType}]") // no exception, so test is an instance of Test
  tb.eval(q"Main.test.asInstanceOf[$testSymbol.type]") // no exception, so test is an instance of Test
  println(
    tb.eval(q"Main.test.getClass").asInstanceOf[Class[_]]
  ) // class __wrapper$1$0bbb246b633b472e8df54efc3e9ff9d9.Test$
  println(
    tb.eval(q"scala.reflect.runtime.universe.typeOf[$testSymbol.type]").asInstanceOf[universe.Type]
  ) // __wrapper$1$0bbb246b633b472e8df54efc3e9ff9d9.Test.type
}

Regarding ClassCastException or IllegalArgumentException. I noticed that the exception disappears if you change UDF return type

def simpleUdf = F.udf (
  simpleFn (_),
  dataType = T.StructType (
    List (
      T.StructField ("a_string", T.StringType),
      T.StructField ("tail1", T.StructType (
        List (
          T.StructField ("another_string", T.StringType),
          T.StructField ("tail2", T.StructType (
            List (
              T.StructField ("an_int", T.IntegerType),
            )
          )),
        )
      )),
    )
  )
)

// ------ ----- ------------------------------------- 
//|column|input|array_data                           |
// ------ ----- ------------------------------------- 
//|bar   |foo  |{hello world, {Another String, {42}}}|
// ------ ----- ------------------------------------- 

I guess this makes sense because a List is :: (aka $colon$colon) of its head and tail, then the tail is :: of its head and tail etc.

CodePudding user response:

@Dmytro Mitin gets the majority of the credit for this answer. Thanks a ton for your help!

The solution I came to uses approach 1) using the untyped APIs. The key is to do two things:

  1. Return a Row (e.g. untyped) from the unwrapped udf
  2. Create the UDF using the untyped API

Here is the toy example

spark.conf.set("spark.sql.legacy.allowUntypedScalaUDF", "true")

def simpleFn(foo: Any): Row = Row("a_string", "hello world", 42L)

def simpleUdf = F.udf(
  simpleFn(_),
  dataType = T.StructType(
    List(
      T.StructField("a_string", T.StringType),
      T.StructField("another_string", T.StringType),
      T.StructField("an_int", T.LongType),
    )
  )
)

Now I can use it like this:

Seq(("bar", "foo"))
  .toDF("column", "input")
  .withColumn(
    "struct_data",
    simpleUdf($"input")
  )
  .withColumn(
    "field_data",
    $"struct_data.a_string"
  )
  .show(truncate=false)

Output:

 ------ ----- --------------------------- ---------- 
|column|input|struct_data                |field_data|
 ------ ----- --------------------------- ---------- 
|bar   |foo  |{a_string, hello world, 42}|a_string  |
 ------ ----- --------------------------- ---------- 
  • Related