Home > OS >  Scala Spark - get number of nulls in column with only column, not the df
Scala Spark - get number of nulls in column with only column, not the df

Time:10-12

I'm working on a bigger project where this function is called from within a withColumn. It has a few different operations but here's an example usage:

case class Employee(id: Int, name: String)
val df = Seq(new Employee(1, "Elia"), new Employee(2, null), new Employee(3, "Fang")).toDF
df.show
 --- ---- 
| id|name|
 --- ---- 
|  1|Elia|
|  2|null|
|  3|Fang|
 --- ---- 

def test1: Column =  {
  concat(col("id"), col("name"))
}
df.withColumn("concat", test1).show
 --- ---- ------ 
| id|name|concat|
 --- ---- ------ 
|  1|Elia| 1Elia|
|  2|null|  null|
|  3|Fang| 3Fang|
 --- ---- ------ 

So what I want to do is throw an exception if one of the columns has any nulls. Something like this within the test1 function:

if(col("id").isNull.sum > 0){
  throw IllegalArgumentException("id can not have any nulls")
}

But apparently columns can't be summed. I also tried sum(col("id").isNull) and this is similarly not valid. All the examples I've seen on stackoverflow have to do with using df-level functions, for example df.filter("id is null").count > 0. But in the framework I'm using this would require a pretty massive refactor in order to do a simple qc check to throw a more accurate exception. The scope of the function I'm modifying doesn't have access to the dataframe. Is what I'm trying to do possible?

Thanks!

CodePudding user response:

You can define UDF (User Defined Functions) for this case. Take a look at this example:

import org.apache.spark.sql.{functions => F}

val testUdf =  F.udf((id: Option[Int], name: Option[String]) => {
  (id, name) match {
    case (None, _) => throw new RuntimeException("id can not have any nulls")
    case (_, None) => throw new RuntimeException("name can not have any nulls")
    case (Some(id), Some(name)) => s"$id$name"
  }
})
df.withColumn("concat", testUdf($"id", $"name")).show

CodePudding user response:

Depending on spark version you're using you have several options.
If spark version < 3.1, you can use udf like this:


val throwExUdf = udf(
    (d: Option[String]) => {
      d match {
        case None => throw new RuntimeException("message")
        case Some(v) => v
      }
    }
  )

df.withColumn(
        "concat",
        when($"name".isNotNull, concat($"id", $"name")).otherwise(throwExUdf($"name"))
)

With spark version >= 3.1 you also have the option to use builtin function raise_error(doc)

like this:

df.withColumn(
        "concat",
        when($"name".isNotNull, concat($"id", $"name")).otherwise(raise_error(lit("Name is null")))
)

  • Related