Home > Net >  Scala: SPARK UDAF for polynomial curve fitting in Scala, got " type (char[]) cannot be converte
Scala: SPARK UDAF for polynomial curve fitting in Scala, got " type (char[]) cannot be converte

Time:04-09

I am trying to do Polynomial curve-fitting on a Spark data frame similar to the one below (Using SPARK version 2.4.0.7.1.5, Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_232)).

I wrote a UDAF for this, it can be registered but then I get an error at runtime.

I am new to Scala and UDAF. Could you help me to take a look at my function to see what is wrong with it?

Thanks,

Example df

val n = 2

val data = Seq(
  (1,80.0,-0.361982467), (1,70.0,0.067847447),  (1,50.0,-0.196768255), 
  (1,40.0,-0.135489192), (1,65.0,0.005993648),  (1,75.0,0.037561161), 
  (1,60.0,-0.212658599), (1,55.0,-0.187080872), (1,85.0, 0.382061571),
  (2,80.0,-0.301982467), (2,70.0,0.097847447),  (2,50.0,-0.186768255), 
  (2,40.0,-0.105489192), (2,65.0,0.007993648),  (2,75.0,0.037561161), 
  (2,60.0,-0.226528599), (2,55.0,-0.170870872), (2,85.0, 0.320615718)
)

val df = data.toDF("id", "x","y")

UDAF code

import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}


class Fitter extends UserDefinedAggregateFunction {
  /**
   * Polynomial curve fitting
   *   y = c   a1*x   a2*x^2   ...  an * x^n
   * parameters:
   *    x: Array[Double]
   *    y: Array[Double]
   *    n: Int, polynomial degree
   * Return:
   *  coeff: the fitted parameters [c, a1, a2,...,an]
   */

  private def polyCurveFitting= (x: Array[Double], y: Array[Double], n: Int) => {

    val obs = new WeightedObservedPoints()

    for (i <- 0 until x.size) {
      obs.add(x(i), y(i))
    }

    // Instantiate a second-degree polynomial fitter.
    val fitter = PolynomialCurveFitter.create(n)

    // Retrieve fitted parameters (coefficients of the polynomial function).
    val coeff = fitter.fit(obs.toList())

    coeff.mkString("|")
  }

  override def inputSchema: StructType =
    new StructType().add(StructField("x", DoubleType))
                    .add(StructField("y", DoubleType))
                    .add(StructField("n", IntegerType))

  override def bufferSchema: StructType =
    new StructType().add(StructField("x_", ArrayType(DoubleType, false)))
                    .add(StructField("y_", ArrayType(DoubleType, false)))
                    .add(StructField("n_", IntegerType))

  override def dataType: DataType = StringType

  override def deterministic: Boolean = true

  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer.update(0, Array[Double]())
    buffer.update(1, Array[Double]())
    buffer.update(2, 0)
  }

  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    if(!input.isNullAt(0)) {
      buffer(0) = buffer.getSeq[Double](0).toArray :  input.getAs[Double](0)
      buffer(1) = buffer.getSeq[Double](1).toArray :  input.getAs[Double](1)
      buffer(2) = input.getAs[Int](2)
    }
  }

  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getSeq[Double](0).toArray    buffer2.getSeq[Double](0)
    buffer1(1) = buffer1.getSeq[Double](1).toArray    buffer2.getSeq[Double](1)
    buffer1(2) = buffer2.getAs[Int](2)
  }

def evaluate(buffer: Row): Array[Char] =
   polyCurveFitting(buffer.getSeq[Double](0).toArray,
                    buffer.getSeq[Double](1).toArray,
                    buffer.getAs[Int](2)).toArray
}


Calling the function

val fitter_test = new Fitter()

spark.udf.register("fitter", fitter_test)

df.createOrReplaceTempView("test")

spark.sql("select fitter(x,y,2) from test group by id").show()

val df_poly = df.groupBy("id").agg(fitter($"x",$"y",lit(n)).as("estimated_parameters"))

df_poly.show()

Expected output(pseudo):

 --- ----------------------------------------------------------------- 
| id|                                             estimated_parameters|
 --- ----------------------------------------------------------------- 
|  1|"0.5034579587428405|-0.026916449551428016|2.6802822386554184E-4" |
|  2|"0.5344951514280016|-0.020286916457958744|2.6916469164575874E-4" |
 --- ----------------------------------------------------------------- 

Error message:

WARN scheduler.TaskSetManager: Lost task 18.0 in stage 7.0 (TID 27, -----.analytics.loc, executor 19): java.lang.IllegalArgumentException: The value ([C@52a57e78) of the type (char[]) cannot be converted to the string type
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:290)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:285)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:396)
    at org.apache.spark.sql.execution.aggregate.ScalaUDAF.eval(udaf.scala:444)
    at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:232)
    at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:224)
    at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:150)
    at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:29)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:266)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:257)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1289)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
ERROR scheduler.TaskSetManager: Task 18 in stage 7.0 failed 4 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 18 in stage 7.0 failed 4 times, most recent failure: Lost task 18.3 in stage 7.0 (TID 52, --------.analytics.loc, executor 19): java.lang.IllegalArgumentException: The value ([C@4f761fc2) of the type (char[]) cannot be converted to the string type
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:290)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:285)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:396)
    at org.apache.spark.sql.execution.aggregate.ScalaUDAF.eval(udaf.scala:444)
    at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:232)
    at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:224)
    at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:150)
    at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:29)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:266)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:257)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1289)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.IllegalArgumentException: The value ([C@4f761fc2) of the type (char[]) cannot be converted to the string type
  at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:290)
  at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:285)
  at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
  at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:396)
  at org.apache.spark.sql.execution.aggregate.ScalaUDAF.eval(udaf.scala:444)
  at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:232)
  at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:224)
  at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:150)
  at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:29)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:266)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:257)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
  at org.apache.spark.scheduler.Task.run(Task.scala:123)
  at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1289)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)

CodePudding user response:

I think that the problem is related to the type of the return value of the method evaluate. The Spark compiler expects a String, as you put in the dataType method, so it detects that type mismatch. If you drop the .toArray and return a String in the evaluate method, that error should disappear.

  • Related