Home > other >  Spark Scala Convert Sparse to Dense Feature
Spark Scala Convert Sparse to Dense Feature

Time:11-30

I have the following output that shows the DataFrame where I'm trying to OneHotEncode a String DataType:

 --------- -------- ------------------ ----------- -------------- ---------- ---------- ------------- ------------------ --------------- ------------- 
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|      feature|
 --------- -------- ------------------ ----------- -------------- ---------- ---------- ------------- ------------------ --------------- ------------- 
|  -122.28|   37.81|              52.0|      340.0|          97.0|     200.0|      87.0|       1.5208|          112500.0|     [NEAR BAY]|(5,[3],[1.0])|
|  -122.13|   37.67|              40.0|     1748.0|         318.0|     914.0|     317.0|       3.8676|          184000.0|     [NEAR BAY]|(5,[3],[1.0])|
|  -122.07|   37.67|              27.0|     3239.0|         671.0|    1469.0|     616.0|       3.2465|          230600.0|     [NEAR BAY]|(5,[3],[1.0])|
|  -122.13|   37.66|              19.0|      862.0|         167.0|     407.0|     183.0|       4.3456|          163000.0|     [NEAR BAY]|(5,[3],[1.0])|

As it can be seen that I have the feature calculated from the ocean_proximity column. I now want to expand on this feature column and have that as a dense vector and for that I tried something like this:

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.{StringIndexer, OneHotEncoder}
import org.apache.spark.ml.feature.CountVectorizer
import org.apache.spark.mllib.linalg.Vector
import spark.implicits._

// Identify how many distinct values are in the OCEAN_PROXIMITY column
val distinctOceanProximities = dfRaw.select(col("ocean_proximity")).distinct().as[String].collect()
  
val oceanProximityAsArrayDF = dfRaw.withColumn("ocean_proximity", array("ocean_proximity"))

val countModel = new CountVectorizer().setInputCol("ocean_proximity").setOutputCol("feature").fit(oceanProximityAsArrayDF)
val transformedDF = countModel.transform(oceanProximityAsArrayDF)
transformedDF.show()

def columnExtractor(idx: Int) = udf((v: Vector) => v(idx))

val featureCols = (0 until distinctOceanProximities.size).map(idx => columnExtractor(idx)($"feature").as(s"$distinctOceanProximities(idx)"))

val toDense = udf((v:Vector) => v.toDense)
val denseDF = transformedDF.withColumn("feature", toDense($"feature"))
denseDF.show()

This however fails with the following message:

org.apache.spark.sql.AnalysisException: Cannot up cast `input` from struct<type:tinyint,size:int,indices:array<int>,values:array<double>> to struct<type:tinyint,size:int,indices:array<int>,values:array<double>>.
The type path of the target object is:
- root class: "org.apache.spark.mllib.linalg.Vector"
You can either add an explicit cast to the input data or choose a higher precision type of the field in the target object
  at org.apache.spark.sql.errors.QueryCompilationErrors$.upCastFailureError(QueryCompilationErrors.scala:137)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveUpCast$$fail(Analyzer.scala:3438)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$36$$anonfun$applyOrElse$198.applyOrElse(Analyzer.scala:3467)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$36$$anonfun$applyOrElse$198.applyOrElse(Analyzer.scala:3445)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:318)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:74)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:318)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:323)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChild$2(TreeNode.scala:377)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$4(TreeNode.scala:438)
  at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at scala.collection.TraversableLike.map(TraversableLike.scala:238)
  at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
  at scala.collection.immutable.List.map(List.scala:298)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:438)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:244)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:406)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:359)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:323)
  at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsDown$1(QueryPlan.scala:94)
  at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:116)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:74)
  at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:116)
  at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:127)
  at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$4(QueryPlan.scala:137)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:244)
  at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:137)
  at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsDown(QueryPlan.scala:94)
  at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressions(QueryPlan.scala:85)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$36.applyOrElse(Analyzer.scala:3445)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$36.applyOrElse(Analyzer.scala:3441)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUp$3(AnalysisHelper.scala:90)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:74)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUp$1(AnalysisHelper.scala:90)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:221)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp(AnalysisHelper.scala:86)

CodePudding user response:

It was so annoying and the actual error was caused by using the wrong imports:

Instead of:

import org.apache.spark.mllib.linalg.Vector

Using this:

import org.apache.spark.ml.linalg.Vector

Solved the issue!

  • Related