I'm working on a bigger project where this function is called from within a withColumn. It has a few different operations but here's an example usage:
case class Employee(id: Int, name: String)
val df = Seq(new Employee(1, "Elia"), new Employee(2, null), new Employee(3, "Fang")).toDF
df.show
--- ----
| id|name|
--- ----
| 1|Elia|
| 2|null|
| 3|Fang|
--- ----
def test1: Column = {
concat(col("id"), col("name"))
}
df.withColumn("concat", test1).show
--- ---- ------
| id|name|concat|
--- ---- ------
| 1|Elia| 1Elia|
| 2|null| null|
| 3|Fang| 3Fang|
--- ---- ------
So what I want to do is throw an exception if one of the columns has any nulls. Something like this within the test1 function:
if(col("id").isNull.sum > 0){
throw IllegalArgumentException("id can not have any nulls")
}
But apparently columns can't be summed. I also tried sum(col("id").isNull) and this is similarly not valid. All the examples I've seen on stackoverflow have to do with using df-level functions, for example df.filter("id is null").count > 0. But in the framework I'm using this would require a pretty massive refactor in order to do a simple qc check to throw a more accurate exception. The scope of the function I'm modifying doesn't have access to the dataframe. Is what I'm trying to do possible?
Thanks!
CodePudding user response:
You can define UDF (User Defined Functions) for this case. Take a look at this example:
import org.apache.spark.sql.{functions => F}
val testUdf = F.udf((id: Option[Int], name: Option[String]) => {
(id, name) match {
case (None, _) => throw new RuntimeException("id can not have any nulls")
case (_, None) => throw new RuntimeException("name can not have any nulls")
case (Some(id), Some(name)) => s"$id$name"
}
})
df.withColumn("concat", testUdf($"id", $"name")).show
CodePudding user response:
Depending on spark version you're using you have several options.
If spark version < 3.1, you can use udf like this:
val throwExUdf = udf(
(d: Option[String]) => {
d match {
case None => throw new RuntimeException("message")
case Some(v) => v
}
}
)
df.withColumn(
"concat",
when($"name".isNotNull, concat($"id", $"name")).otherwise(throwExUdf($"name"))
)
With spark version >= 3.1 you also have the option to use builtin function raise_error
(doc)
like this:
df.withColumn(
"concat",
when($"name".isNotNull, concat($"id", $"name")).otherwise(raise_error(lit("Name is null")))
)