Home > Software engineering >  a Spark Scala String Matching UDF
a Spark Scala String Matching UDF

Time:11-07

import org.apache.spark.sql.functions.lit

val containsString  = (haystack:String, needle:String) =>{
    if (haystack.contains(needle)){
        1
    }
    else{
         0
    }
}

val containsStringUDF = udf(containsString _)

val new_df = df.withColumn("nameContainsxyz", containsStringUDF($"name"),lit("xyz")))

I am new to Spark scala. The above code seems to compile successfully. However, when I try to run

new_df.groupBy("nameContainsxyz").sum().show()

The error throws out. Could someone help me out? The error message is below.

Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (string, string) => int)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.agg_doAggregateWithKeys_0$(Unknown Source)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.processNext(Unknown Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$15$$anon$2.hasNext(WholeStageCodegenExec.scala:655)
  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
  at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
  at org.apache.spark.scheduler.Task.run(Task.scala:121)
  at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
  ... 3 more
Caused by: java.lang.NullPointerException
  at $anonfun$1.apply(<console>:41)
  at $anonfun$1.apply(<console>:40)
  ... 15 more

Just an update: the error throws out because some of the rows in the specified column is null. Adding null check in the UDF completely solved the issue.

Thanks

CodePudding user response:

If I understand what you're trying to do correctly, you want to know how many rows there are in which 'xyz' appears in column name?

You can do that without using a UDF:

df.filter('name.contains("xyz")). count

  • Related