Based on the discussion from How to use Scala UDF in PySpark?, I am able to execute the UDF from a scala code for Primitive types, but I want to call scala UDF from PySpark which accepts a Map[String, String].
package com.test
object ScalaPySparkUDFs extends Serializable {
def testFunction1(x: Int): Int = { x * 2 }
def testFunction2(x: Map[String, String]) : String = { // use the Map key and value pair}
def testUDFFunction1 = udf { x: Int => testFunction1(x) }
def testUDFFunction2 = udf { x: Map[String, String] => testFunction2(x) }
}
The UDF1 works fine:
_f = sc._jvm.com.test.ScalaPySparkUDFs.testUDFFunction1()
Column(_f.apply(_to_seq(sc, [col], _to_java_column)))
But I am not sure how to execute testUDFFunction2 from PySpark:
_f2 = sc._jvm.com.test.ScalaPySparkUDFs.testUDFFunction2()
Column(_f2.apply(_to_seq(sc, [lit("KEY"), col("FIRSTCOLUMN"), lit("KEY2"), col("SECONDCOLUMN")], _to_java_column)))
This fails and generates the below exception:
Py4JJavaError: An error occurred while calling o430.apply.
: java.lang.ClassCastException: sc._jvm.com.test.ScalaPySparkUDFs.testUDFFunction2$$Lambda$3693/1231805146 cannot be cast to scala.Function4
at org.apache.spark.sql.catalyst.expressions.ScalaUDF.<init>(ScalaUDF.scala:241)
at org.apache.spark.sql.expressions.SparkUserDefinedFunction.createScalaUDF(UserDefinedFunction.scala:113)
at org.apache.spark.sql.expressions.SparkUserDefinedFunction.apply(UserDefinedFunction.scala:101)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.lang.Thread.run(Thread.java:748)
I can easily do that from scala as:
val output = input.withColumn("result", testUDFFunction2(map(
lit("KEY1"), col("FIRSTCOLUMN"),
lit("KEY2"), col("SECONDCOLUMN")
)))
But I want convert that code in PySpark, I am not able to find good documentation. As mentioned in https://spark.apache.org/docs/3.2.1/api/java/org/apache/spark/sql/expressions/UserDefinedFunction.html, there are only two apply methods which accepts list of Column arguments. Any recommendations how I can proceed?
I tried create_map function in pyspark.sql.functions but that doesn't work with Col types.
CodePudding user response:
I can see the problem with how you are calling the function.
You need to change the following line:
_f2 = sc._jvm.com.test.ScalaPySparkUDFs.testUDFFunction2()
Column(_f2.apply(_to_seq(sc, [lit("KEY"), col("FIRSTCOLUMN"), lit("KEY2"), col("SECONDCOLUMN")], _to_java_column)))
As, the function can be called using 'map' method in scala, there is an equivalent method 'create_map' in pyspark. Only thing you need to do is:
from pyspark.sql.functions import create_map
_f2 = sc._jvm.com.test.ScalaPySparkUDFs.testUDFFunction2()
Column(_f2.apply(_to_seq(sc, [create_map(lit("KEY"), col("FIRSTCOLUMN"), lit("KEY2"), col("SECONDCOLUMN"))], _to_java_column)))
That way, you will be able to call the function and solve ClassCastExceptions.