I am trying to transpose a huge dataframe (100Mx20K). As the dataframe is spread over multiple nodes and difficult to collect on the driver, I would like to do the transpose through conversion through mllib
matrices. The idea seems to have been tested elsewhere, so the opted procedure was as follows:
import org.apache.spark.sql.functions._
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix}
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.parquet("temp/test.parquet").select("H1","H2","H3","H4")
val matrixColumns = df.columns
val rdd = df.select(array(matrixColumns:_*).as("arr")).as[Array[Int]].rdd
.zipWithIndex()
.map{ case(arr, index) => IndexedRow(index, Vectors.dense(arr.map(_.toDouble)))}
val dm = new IndexedRowMatrix(rdd).toBlockMatrix().toLocalMatrix()
I noticed a possible type and tried substitution:
orig:
val rdd = df.select(array(matrixColumns:_*).as("arr"))....
modified:
val rdd = df.select(Array(matrixColumns:_*)).as("arr")...
However, neither works for me and the above change throws error:
scala> df.select(Array(matrixColumns:_*)).as("arr")
^
error: overloaded method select with alternatives:
[U1](c1: org.apache.spark.sql.TypedColumn[org.apache.spark.sql.Row,U1]): org.apache.spark.sql.Dataset[U1] <and>
(col: String,cols: String*)org.apache.spark.sql.DataFrame <and>
(cols: org.apache.spark.sql.Column*)org.apache.spark.sql.DataFrame
cannot be applied to (Array[String])
I am unsure if there is a version issue (I am using Spark 3.3.0) or if the problem is elsewhere. I would be grateful for any help in fixing the above error.
CodePudding user response:
Change the select invocation to:
df.select(matrixColumns.head, matrixColumns: _*)
or
import org.apache.spark.sql.functions.col
df.select(matrixColumns.map(col(_)):_*)