I have a column in my Spark dataframe in Scala that was generated as a result of aggregration of multiple columns using
agg(collect_list(struct(col(abc), col(aaa)).as(def)
I want to pass this column to a UDF for further processing to work on one one of the index in this aggregated column.
When I pass argument to my UDF as:
.withColumn(def, remove
(col(xyz), col(def)))
UDF- Type as Seq[Row]: val removeUnstableActivations: UserDefinedFunction = udf((xyz: java.util.Date, def: Seq[Row])
I get the error:
Exception encountered when invoking run on a nested suite - Schema for type org.apache.spark.sql.Row is not supported
How should I pass this columns and what should be the datatype of the column in UDF?
CodePudding user response:
Indeed schema for type Row is not supported but you can return a case class. Spark will treat returned case class as StructType. E.g:
import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.Row
val df = Seq(
(1, "a"),
(2, "b"),
(3, "c")
).toDF("number", "word")
val aggDf = df.agg(
collect_list(struct(col("number"), col("word"))) as "aggColumn"
)
aggDf.printSchema()
// |-- aggColumn: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- number: string (nullable = true)
// | | |-- word: integer (nullable = false)
case class ReturnSchema(word: String, number: Int)
val myUdf: UserDefinedFunction =
udf((collection: Seq[Row]) => {
collection.map(r => {
val word = r.getAs[String]("word")
val newNumber = r.getAs[Int]("number") * 100
new ReturnSchema(word, newNumber)
})
})
val finalDf = aggDf.select(myUdf(col("aggColumn")).as("udfTranformedColumn"))
finalDf.printSchema
// root
// |-- udfTranformedColumn: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- word: string (nullable = true)
// | | |-- number: integer (nullable = false)
finalDf.show(false)
// ------------------------------
// |udfTranformedColumn |
// ------------------------------
// |[[a, 100], [b, 200], [c, 300]]|
// ------------------------------