I have the following output that shows the DataFrame where I'm trying to OneHotEncode a String DataType:
--------- -------- ------------------ ----------- -------------- ---------- ---------- ------------- ------------------ --------------- -------------
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity| feature|
--------- -------- ------------------ ----------- -------------- ---------- ---------- ------------- ------------------ --------------- -------------
| -122.28| 37.81| 52.0| 340.0| 97.0| 200.0| 87.0| 1.5208| 112500.0| [NEAR BAY]|(5,[3],[1.0])|
| -122.13| 37.67| 40.0| 1748.0| 318.0| 914.0| 317.0| 3.8676| 184000.0| [NEAR BAY]|(5,[3],[1.0])|
| -122.07| 37.67| 27.0| 3239.0| 671.0| 1469.0| 616.0| 3.2465| 230600.0| [NEAR BAY]|(5,[3],[1.0])|
| -122.13| 37.66| 19.0| 862.0| 167.0| 407.0| 183.0| 4.3456| 163000.0| [NEAR BAY]|(5,[3],[1.0])|
As it can be seen that I have the feature calculated from the ocean_proximity column. I now want to expand on this feature column and have that as a dense vector and for that I tried something like this:
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.{StringIndexer, OneHotEncoder}
import org.apache.spark.ml.feature.CountVectorizer
import org.apache.spark.mllib.linalg.Vector
import spark.implicits._
// Identify how many distinct values are in the OCEAN_PROXIMITY column
val distinctOceanProximities = dfRaw.select(col("ocean_proximity")).distinct().as[String].collect()
val oceanProximityAsArrayDF = dfRaw.withColumn("ocean_proximity", array("ocean_proximity"))
val countModel = new CountVectorizer().setInputCol("ocean_proximity").setOutputCol("feature").fit(oceanProximityAsArrayDF)
val transformedDF = countModel.transform(oceanProximityAsArrayDF)
transformedDF.show()
def columnExtractor(idx: Int) = udf((v: Vector) => v(idx))
val featureCols = (0 until distinctOceanProximities.size).map(idx => columnExtractor(idx)($"feature").as(s"$distinctOceanProximities(idx)"))
val toDense = udf((v:Vector) => v.toDense)
val denseDF = transformedDF.withColumn("feature", toDense($"feature"))
denseDF.show()
This however fails with the following message:
org.apache.spark.sql.AnalysisException: Cannot up cast `input` from struct<type:tinyint,size:int,indices:array<int>,values:array<double>> to struct<type:tinyint,size:int,indices:array<int>,values:array<double>>.
The type path of the target object is:
- root class: "org.apache.spark.mllib.linalg.Vector"
You can either add an explicit cast to the input data or choose a higher precision type of the field in the target object
at org.apache.spark.sql.errors.QueryCompilationErrors$.upCastFailureError(QueryCompilationErrors.scala:137)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveUpCast$$fail(Analyzer.scala:3438)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$36$$anonfun$applyOrElse$198.applyOrElse(Analyzer.scala:3467)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$36$$anonfun$applyOrElse$198.applyOrElse(Analyzer.scala:3445)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:318)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:74)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:318)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:323)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChild$2(TreeNode.scala:377)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$4(TreeNode.scala:438)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.immutable.List.map(List.scala:298)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:438)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:244)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:406)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:359)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:323)
at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsDown$1(QueryPlan.scala:94)
at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:116)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:74)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:116)
at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:127)
at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$4(QueryPlan.scala:137)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:244)
at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:137)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsDown(QueryPlan.scala:94)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressions(QueryPlan.scala:85)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$36.applyOrElse(Analyzer.scala:3445)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$36.applyOrElse(Analyzer.scala:3441)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUp$3(AnalysisHelper.scala:90)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:74)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUp$1(AnalysisHelper.scala:90)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:221)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp(AnalysisHelper.scala:86)
CodePudding user response:
It was so annoying and the actual error was caused by using the wrong imports:
Instead of:
import org.apache.spark.mllib.linalg.Vector
Using this:
import org.apache.spark.ml.linalg.Vector
Solved the issue!