I am trying to do Polynomial curve-fitting on a Spark data frame similar to the one below (Using SPARK version 2.4.0.7.1.5, Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_232)).
I wrote a UDAF for this, it can be registered but then I get an error at runtime.
I am new to Scala and UDAF. Could you help me to take a look at my function to see what is wrong with it?
Thanks,
Example df
val n = 2
val data = Seq(
(1,80.0,-0.361982467), (1,70.0,0.067847447), (1,50.0,-0.196768255),
(1,40.0,-0.135489192), (1,65.0,0.005993648), (1,75.0,0.037561161),
(1,60.0,-0.212658599), (1,55.0,-0.187080872), (1,85.0, 0.382061571),
(2,80.0,-0.301982467), (2,70.0,0.097847447), (2,50.0,-0.186768255),
(2,40.0,-0.105489192), (2,65.0,0.007993648), (2,75.0,0.037561161),
(2,60.0,-0.226528599), (2,55.0,-0.170870872), (2,85.0, 0.320615718)
)
val df = data.toDF("id", "x","y")
UDAF code
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
class Fitter extends UserDefinedAggregateFunction {
/**
* Polynomial curve fitting
* y = c a1*x a2*x^2 ... an * x^n
* parameters:
* x: Array[Double]
* y: Array[Double]
* n: Int, polynomial degree
* Return:
* coeff: the fitted parameters [c, a1, a2,...,an]
*/
private def polyCurveFitting= (x: Array[Double], y: Array[Double], n: Int) => {
val obs = new WeightedObservedPoints()
for (i <- 0 until x.size) {
obs.add(x(i), y(i))
}
// Instantiate a second-degree polynomial fitter.
val fitter = PolynomialCurveFitter.create(n)
// Retrieve fitted parameters (coefficients of the polynomial function).
val coeff = fitter.fit(obs.toList())
coeff.mkString("|")
}
override def inputSchema: StructType =
new StructType().add(StructField("x", DoubleType))
.add(StructField("y", DoubleType))
.add(StructField("n", IntegerType))
override def bufferSchema: StructType =
new StructType().add(StructField("x_", ArrayType(DoubleType, false)))
.add(StructField("y_", ArrayType(DoubleType, false)))
.add(StructField("n_", IntegerType))
override def dataType: DataType = StringType
override def deterministic: Boolean = true
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer.update(0, Array[Double]())
buffer.update(1, Array[Double]())
buffer.update(2, 0)
}
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if(!input.isNullAt(0)) {
buffer(0) = buffer.getSeq[Double](0).toArray : input.getAs[Double](0)
buffer(1) = buffer.getSeq[Double](1).toArray : input.getAs[Double](1)
buffer(2) = input.getAs[Int](2)
}
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getSeq[Double](0).toArray buffer2.getSeq[Double](0)
buffer1(1) = buffer1.getSeq[Double](1).toArray buffer2.getSeq[Double](1)
buffer1(2) = buffer2.getAs[Int](2)
}
def evaluate(buffer: Row): Array[Char] =
polyCurveFitting(buffer.getSeq[Double](0).toArray,
buffer.getSeq[Double](1).toArray,
buffer.getAs[Int](2)).toArray
}
Calling the function
val fitter_test = new Fitter()
spark.udf.register("fitter", fitter_test)
df.createOrReplaceTempView("test")
spark.sql("select fitter(x,y,2) from test group by id").show()
val df_poly = df.groupBy("id").agg(fitter($"x",$"y",lit(n)).as("estimated_parameters"))
df_poly.show()
Expected output(pseudo):
--- -----------------------------------------------------------------
| id| estimated_parameters|
--- -----------------------------------------------------------------
| 1|"0.5034579587428405|-0.026916449551428016|2.6802822386554184E-4" |
| 2|"0.5344951514280016|-0.020286916457958744|2.6916469164575874E-4" |
--- -----------------------------------------------------------------
Error message:
WARN scheduler.TaskSetManager: Lost task 18.0 in stage 7.0 (TID 27, -----.analytics.loc, executor 19): java.lang.IllegalArgumentException: The value ([C@52a57e78) of the type (char[]) cannot be converted to the string type
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:290)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:285)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:396)
at org.apache.spark.sql.execution.aggregate.ScalaUDAF.eval(udaf.scala:444)
at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:232)
at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:224)
at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:150)
at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:29)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:266)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:257)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1289)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
ERROR scheduler.TaskSetManager: Task 18 in stage 7.0 failed 4 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 18 in stage 7.0 failed 4 times, most recent failure: Lost task 18.3 in stage 7.0 (TID 52, --------.analytics.loc, executor 19): java.lang.IllegalArgumentException: The value ([C@4f761fc2) of the type (char[]) cannot be converted to the string type
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:290)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:285)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:396)
at org.apache.spark.sql.execution.aggregate.ScalaUDAF.eval(udaf.scala:444)
at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:232)
at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:224)
at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:150)
at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:29)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:266)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:257)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1289)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: The value ([C@4f761fc2) of the type (char[]) cannot be converted to the string type
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:290)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:285)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:396)
at org.apache.spark.sql.execution.aggregate.ScalaUDAF.eval(udaf.scala:444)
at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:232)
at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:224)
at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:150)
at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:29)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:266)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:257)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1289)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
CodePudding user response:
I think that the problem is related to the type of the return value of the method evaluate. The Spark compiler expects a String, as you put in the dataType method, so it detects that type mismatch. If you drop the .toArray and return a String in the evaluate method, that error should disappear.