Home > Software design >  How do I handle a null value in the a scala UDF?
How do I handle a null value in the a scala UDF?

Time:09-21

I understand that there are many SO answers related to what I am asking, but since I am very new to scala, I am not able to understand those answer. Would really appreciate if someone please help me correct my UDF.

I have this UDF which is meant to do the timezone conversion from GMT to MST:

val Gmt2Mst = (dtm_str: String, inFmt: String, outFmt: String) => {     
         if ("".equals(dtm_str) || dtm_str == null || dtm_str.length() < inFmt.length())  {
           null 
           
         }
         else {
           val gmtZoneId = ZoneId.of("GMT", ZoneId.SHORT_IDS);
           val mstZoneId = ZoneId.of("MST", ZoneId.SHORT_IDS);
           
           val inFormatter = DateTimeFormatter.ofPattern(inFmt);
           val outFormatter = DateTimeFormatter.ofPattern(outFmt);
           val dateTime = LocalDateTime.parse(dtm_str, inFormatter);
           val gmt = ZonedDateTime.of(dateTime, gmtZoneId)
           val mst = gmt.withZoneSameInstant(mstZoneId)
           mst.format(outFormatter)
         }
     }

spark.udf.register("Gmt2Mst", Gmt2Mst)

But whenever there is NULL encountered it fails to handle that. I am trying to handle it using dtm_str == null but it still fails. Can some please help me with what correction do I have to make instead of dtm_str == null which can help me achieve my goal?

To give an example, if I run the below spark-sql:

spark.sql("select null as col1, Gmt2Mst(null,'yyyyMMddHHmm', 'yyyyMMddHHmm') as col2").show()

I am getting this error:

22/09/20 14:10:31 INFO TaskSetManager: Starting task 101.1 in stage 27.0 (TID 1809) (10.243.37.204, executor 18, partition 101, PROCESS_LOCAL, 5648 bytes) taskResourceAssignments Map()
22/09/20 14:10:31 INFO TaskSetManager: Lost task 100.0 in stage 27.0 (TID 1801) on 10.243.37.204, executor 18: org.apache.spark.SparkException (Failed to execute user defined function (anonfun$4: (string, string) => string)) [duplicate 1]
22/09/20 14:10:31 INFO TaskSetManager: Starting task 100.1 in stage 27.0 (TID 1810) (10.243.37.241, executor 1, partition 100, PROCESS_LOCAL, 5648 bytes) taskResourceAssignments Map()
22/09/20 14:10:31 INFO TaskSetManager: Lost task 102.0 in stage 27.0 (TID 1803) on 10.243.37.241, executor 1: org.apache.spark.SparkException (Failed to execute user defined function (anonfun$4: (string, string) => string)) [duplicate 2]
22/09/20 14:10:31 INFO TaskSetManager: Starting task 102.1 in stage 27.0 (TID 1811) (10.243.36.183, executor 22, partition 102, PROCESS_LOCAL, 5648 bytes) taskResourceAssignments Map()
22/09/20 14:10:31 INFO TaskSetManager: Finished task 80.0 in stage 27.0 (TID 1781) in 2301 ms on 10.243.36.183 (executor 22) (81/355)
22/09/20 14:10:31 INFO TaskSetManager: Starting task 108.0 in stage 27.0 (TID 1812) (10.243.36.156, executor 4, partition 108, PROCESS_LOCAL, 5648 bytes) taskResourceAssignments Map()
22/09/20 14:10:31 INFO TaskSetManager: Lost task 103.0 in stage 27.0 (TID 1804) on 10.243.36.156, executor 4: org.apache.spark.SparkException (Failed to execute user defined function (anonfun$4: (string, string) => string)) [duplicate 3]
22/09/20 14:10:31 INFO TaskSetManager: Starting task 103.1 in stage 27.0 (TID 1813) (10.243.36.180, executor 9, partition 103, PROCESS_LOCAL, 5648 bytes) taskResourceAssignments Map()
22/09/20 14:10:31 WARN TaskSetManager: Lost task 105.0 in stage 27.0 (TID 1806) (10.243.36.180 executor 9): org.apache.spark.SparkException: Failed to execute user defined function (anonfun$3: (string, string) => string)
        at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:136)
        at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage29.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
        at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.NullPointerException
        at org.commonspirit.sepsis_bio.recovery.SepsisRecoveryBundle$$anonfun$3.apply(SepsisRecoveryBundle.scala:123)
        at org.commonspirit.sepsis_bio.recovery.SepsisRecoveryBundle$$anonfun$3.apply(SepsisRecoveryBundle.scala:122)
        ... 15 more


CodePudding user response:

I did the following test and it seems that it works:

Create a dataframe with a null type. The Schema would be:

root
  |-- v0: string (nullable = true)
  |-- v1: string (nullable = true)
  |-- null: null (nullable = true)

for example:

 ---- ----- ---- 
|  v0|   v1|null|
 ---- ----- ---- 
|hola|adios|null|
 ---- ----- ---- 

Create the udf:

val udf1 = udf{ v1: Any => { if(v1 != null) s"${v1}_transformed" else null } }

Note that working with Any in Scala is a bad practice, but this is Spark Sql and to handle a value that could be of two different types you would need to work with this supertype.

Register the udf:

spark.udf.register("udf1", udf1) 

Create the view:

df2.createTempView("df2")

Apply the udf to the view:

spark.sql("select udf1(null) from df").show()

it shows:

 --------- 
|UDF(null)|
 --------- 
|     null|
 --------- 

Apply to a column with not null value:

 spark.sql("select udf1(v0) from df2").show()

it shows:

 ---------------- 
|         UDF(v0)|
 ---------------- 
|hola_transformed|
 ---------------- 
 
  • Related