I am trying to split and convert a column containg data in below format into map inside a udf.
UDF:
def convertToMapUDF = udf((c: String) => {
val arr = c.split(",")
val l = arr.toList
val regexPattern = ".*(=).*".r
println(s"column value: $c")
s"$c" match {
case regexPattern(a) => Some(l.map(x => x.split("=")).map(a => a(0).toString -> a(1).toString).toMap)
case "null" => Some(Map[String, String]())
}
})
val splitColList = List("r_split")
val d = ft.select(splitColList.map(c => convertToMapUDF(col(c))): _*)
r_split columns contains data like
null
null
As=true, eMsion_New:E=true, HR:E=true, Don:E=true, Hrs=true, PAD:E=true, mog:E=true, N4k:E=true, WY:E=true, AT:E=true, Dt_RFC:E=true, DASH_ALL:E=true, TE:E=true, C_14:E=true, We:E=true, PG:E=true, ZR:E=true, MP:E=true, m2M:E=true, HC:E=true, Nos:E=true,
null
Exception:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3712.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3712.0 (TID 37567, 10.73.35.140, executor 48): org.apache.spark.SparkException: Failed to execute user defined function($read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$7a8ac347ba800d5b55403548fd65e2$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$Lambda$10845/2054440395: (string) => map<string,string>)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:733)
at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:80)
at org.apache.spark.sql.execution.collect.Collector.$anonfun$processFunc$1(Collector.scala:187)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
at org.apache.spark.scheduler.Task.run(Task.scala:117)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:655)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1581)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:658)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at line6ec706fd41324bde944dab3ff50b81c6224.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$$$7a8ac347ba800d5b55403548fd65e2$$$$w$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.$anonfun$convertToMapUDF$1(command-1649645762990220:2)
... 14 more
I tried to use Option and Some but the issue is not being resolved. The udf when executing as independant function on above string is working fine.
CodePudding user response:
The last line is unnecessary confusing and not relevant for your problem. You basically mean
val d = ft.select(convertToMapUDF(col("r_split")))
right?
Anyway, what behavior do you expect for missing values (null
)? I guess you want to skip them, that is keep null
. This behavior is implemented in spark for udfs working on non-nullable types (Float/Int/etc). For nullable types, including String, you need to implement it yourself.
Just check for c == null
first thing in the udf and return null
in that case. Sorry, I see that your function returns Option[Map[_,_]]
. In that case return None
.
I think (not sure at the moment) you can also change your param to Option[String]
but you will still have to process the None
case yourself and perhaps null
as well. I am a bit hazy on that.
CodePudding user response:
Nulls are coming from your dataset. You can make your udfs null safe with cala.util.Try
(if you don't want to lose the record). Just wrap convertToMapUDF
into Try construction. It will ensure any non-fatal exception is caught and a Failure object is returned.
def convertToMapUDF: UserDefinedFunction = udf((c: String) => {
Try {
val arr = c.split(",")
val l = arr.toList
val regexPattern = ".*(=).*".r
println(s"column value: $c")
s"$c" match {
case regexPattern(a) => l.map(x => x.split("=")).map(a => a(0) -> a(1)).toMap
case _ => Map[String, String]()
}
}.toOption
})
CodePudding user response:
You don't need to use a UDF here at all: Spark has all the functionality you need. Consider the following:
ft
.withColumn("id", monotonically_increasing_id())
.select(col("id"), explode(split(col("r_split"), ",")) as "r")
.select(col("id"), split(col("r"), "=") as "s")
.select(col("id"), map_from_arrays(array(col("s")(0)) as "k", array(col("s")(1)) as "v") as "map")
.groupBy("id").agg(collect_list("map") as "maps")
.select(
aggregate(col("maps"), typedLit(Map[String, String]()), (acc, nxt) => map_concat(acc, nxt)) as "r_split"
)
First we add an ID so we can put everything back together again at the end, then it takes your input string r_split
, and breaks it on the ",". It then 'explodes' the resulting array so that each term has it's own Row
. We then split the rows on the "=" symbol to create another array. Next we create a map by selecting the first element of the array as the key, and the second element as the value. This gives us a map for each entry, so we gather them together by ID (hence the first step) into a single list. Finally we reduce that list to a single map.
I feel there are too many steps here, and the various split/explode phases can probably we done more efficiently with regexp_extract
, but this demonstrates the idea without having to use a UDF.