I understand that there are many SO answers related to what I am asking, but since I am very new to scala, I am not able to understand those answer. Would really appreciate if someone please help me correct my UDF.
I have this UDF which is meant to do the timezone conversion from GMT to MST:
val Gmt2Mst = (dtm_str: String, inFmt: String, outFmt: String) => {
if ("".equals(dtm_str) || dtm_str == null || dtm_str.length() < inFmt.length()) {
null
}
else {
val gmtZoneId = ZoneId.of("GMT", ZoneId.SHORT_IDS);
val mstZoneId = ZoneId.of("MST", ZoneId.SHORT_IDS);
val inFormatter = DateTimeFormatter.ofPattern(inFmt);
val outFormatter = DateTimeFormatter.ofPattern(outFmt);
val dateTime = LocalDateTime.parse(dtm_str, inFormatter);
val gmt = ZonedDateTime.of(dateTime, gmtZoneId)
val mst = gmt.withZoneSameInstant(mstZoneId)
mst.format(outFormatter)
}
}
spark.udf.register("Gmt2Mst", Gmt2Mst)
But whenever there is NULL
encountered it fails to handle that. I am trying to handle it using dtm_str == null
but it still fails. Can some please help me with what correction do I have to make instead of dtm_str == null
which can help me achieve my goal?
To give an example, if I run the below spark-sql:
spark.sql("select null as col1, Gmt2Mst(null,'yyyyMMddHHmm', 'yyyyMMddHHmm') as col2").show()
I am getting this error:
22/09/20 14:10:31 INFO TaskSetManager: Starting task 101.1 in stage 27.0 (TID 1809) (10.243.37.204, executor 18, partition 101, PROCESS_LOCAL, 5648 bytes) taskResourceAssignments Map()
22/09/20 14:10:31 INFO TaskSetManager: Lost task 100.0 in stage 27.0 (TID 1801) on 10.243.37.204, executor 18: org.apache.spark.SparkException (Failed to execute user defined function (anonfun$4: (string, string) => string)) [duplicate 1]
22/09/20 14:10:31 INFO TaskSetManager: Starting task 100.1 in stage 27.0 (TID 1810) (10.243.37.241, executor 1, partition 100, PROCESS_LOCAL, 5648 bytes) taskResourceAssignments Map()
22/09/20 14:10:31 INFO TaskSetManager: Lost task 102.0 in stage 27.0 (TID 1803) on 10.243.37.241, executor 1: org.apache.spark.SparkException (Failed to execute user defined function (anonfun$4: (string, string) => string)) [duplicate 2]
22/09/20 14:10:31 INFO TaskSetManager: Starting task 102.1 in stage 27.0 (TID 1811) (10.243.36.183, executor 22, partition 102, PROCESS_LOCAL, 5648 bytes) taskResourceAssignments Map()
22/09/20 14:10:31 INFO TaskSetManager: Finished task 80.0 in stage 27.0 (TID 1781) in 2301 ms on 10.243.36.183 (executor 22) (81/355)
22/09/20 14:10:31 INFO TaskSetManager: Starting task 108.0 in stage 27.0 (TID 1812) (10.243.36.156, executor 4, partition 108, PROCESS_LOCAL, 5648 bytes) taskResourceAssignments Map()
22/09/20 14:10:31 INFO TaskSetManager: Lost task 103.0 in stage 27.0 (TID 1804) on 10.243.36.156, executor 4: org.apache.spark.SparkException (Failed to execute user defined function (anonfun$4: (string, string) => string)) [duplicate 3]
22/09/20 14:10:31 INFO TaskSetManager: Starting task 103.1 in stage 27.0 (TID 1813) (10.243.36.180, executor 9, partition 103, PROCESS_LOCAL, 5648 bytes) taskResourceAssignments Map()
22/09/20 14:10:31 WARN TaskSetManager: Lost task 105.0 in stage 27.0 (TID 1806) (10.243.36.180 executor 9): org.apache.spark.SparkException: Failed to execute user defined function (anonfun$3: (string, string) => string)
at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:136)
at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage29.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.NullPointerException
at org.commonspirit.sepsis_bio.recovery.SepsisRecoveryBundle$$anonfun$3.apply(SepsisRecoveryBundle.scala:123)
at org.commonspirit.sepsis_bio.recovery.SepsisRecoveryBundle$$anonfun$3.apply(SepsisRecoveryBundle.scala:122)
... 15 more
CodePudding user response:
I did the following test and it seems that it works:
Create a dataframe with a null type. The Schema would be:
root
|-- v0: string (nullable = true)
|-- v1: string (nullable = true)
|-- null: null (nullable = true)
for example:
---- ----- ----
| v0| v1|null|
---- ----- ----
|hola|adios|null|
---- ----- ----
Create the udf:
val udf1 = udf{ v1: Any => { if(v1 != null) s"${v1}_transformed" else null } }
Note that working with Any in Scala is a bad practice, but this is Spark Sql and to handle a value that could be of two different types you would need to work with this supertype.
Register the udf:
spark.udf.register("udf1", udf1)
Create the view:
df2.createTempView("df2")
Apply the udf to the view:
spark.sql("select udf1(null) from df").show()
it shows:
---------
|UDF(null)|
---------
| null|
---------
Apply to a column with not null value:
spark.sql("select udf1(v0) from df2").show()
it shows:
----------------
| UDF(v0)|
----------------
|hola_transformed|
----------------