Home > OS >  Pass a ArrayType column to UDF in Spark Scala
Pass a ArrayType column to UDF in Spark Scala

Time:10-20

I have a column in my Spark dataframe in Scala that was generated as a result of aggregration of multiple columns using

 agg(collect_list(struct(col(abc), col(aaa)).as(def)

I want to pass this column to a UDF for further processing to work on one one of the index in this aggregated column.

When I pass argument to my UDF as:

.withColumn(def, remove
            (col(xyz), col(def)))

UDF- Type as Seq[Row]: val removeUnstableActivations: UserDefinedFunction = udf((xyz: java.util.Date, def: Seq[Row])

I get the error:

Exception encountered when invoking run on a nested suite - Schema for type org.apache.spark.sql.Row is not supported

How should I pass this columns and what should be the datatype of the column in UDF?

CodePudding user response:

Indeed schema for type Row is not supported but you can return a case class. Spark will treat returned case class as StructType. E.g:

import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.Row

val df = Seq(
  (1, "a"),
  (2, "b"),
  (3, "c")
).toDF("number", "word")

val aggDf = df.agg(
  collect_list(struct(col("number"), col("word"))) as "aggColumn"
)

aggDf.printSchema()
// |-- aggColumn: array (nullable = true)
// |    |-- element: struct (containsNull = true)
// |    |    |-- number: string (nullable = true)
// |    |    |-- word: integer (nullable = false)

case class ReturnSchema(word: String, number: Int)

val myUdf: UserDefinedFunction =
  udf((collection: Seq[Row]) => {
    collection.map(r => {
      val word   = r.getAs[String]("word")
      val newNumber = r.getAs[Int]("number") * 100

      new ReturnSchema(word, newNumber)
    })
  })
  
val finalDf = aggDf.select(myUdf(col("aggColumn")).as("udfTranformedColumn"))

finalDf.printSchema
// root
//  |-- udfTranformedColumn: array (nullable = true)
//  |    |-- element: struct (containsNull = true)
//  |    |    |-- word: string (nullable = true)
//  |    |    |-- number: integer (nullable = false)

finalDf.show(false)
//  ------------------------------ 
// |udfTranformedColumn           |
//  ------------------------------ 
// |[[a, 100], [b, 200], [c, 300]]|
//  ------------------------------ 
  • Related