Home > Blockchain >  Spark-scala: Converting dataframe to mllib Matrix
Spark-scala: Converting dataframe to mllib Matrix

Time:09-04

I am trying to transpose a huge dataframe (100Mx20K). As the dataframe is spread over multiple nodes and difficult to collect on the driver, I would like to do the transpose through conversion through mllib matrices. The idea seems to have been tested elsewhere, so the opted procedure was as follows:

import org.apache.spark.sql.functions._
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix}

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val df = sqlContext.read.parquet("temp/test.parquet").select("H1","H2","H3","H4")
val matrixColumns = df.columns

val rdd = df.select(array(matrixColumns:_*).as("arr")).as[Array[Int]].rdd
  .zipWithIndex()
  .map{ case(arr, index) => IndexedRow(index, Vectors.dense(arr.map(_.toDouble)))} 

val dm = new IndexedRowMatrix(rdd).toBlockMatrix().toLocalMatrix()

I noticed a possible type and tried substitution:

orig:
    val rdd = df.select(array(matrixColumns:_*).as("arr"))....

modified:
    val rdd = df.select(Array(matrixColumns:_*)).as("arr")...

However, neither works for me and the above change throws error:

scala> df.select(Array(matrixColumns:_*)).as("arr")
              ^
       error: overloaded method select with alternatives:
         [U1](c1: org.apache.spark.sql.TypedColumn[org.apache.spark.sql.Row,U1]): org.apache.spark.sql.Dataset[U1] <and>
         (col: String,cols: String*)org.apache.spark.sql.DataFrame <and>
         (cols: org.apache.spark.sql.Column*)org.apache.spark.sql.DataFrame
        cannot be applied to (Array[String])

I am unsure if there is a version issue (I am using Spark 3.3.0) or if the problem is elsewhere. I would be grateful for any help in fixing the above error.

CodePudding user response:

Change the select invocation to:

df.select(matrixColumns.head, matrixColumns: _*)

or

import org.apache.spark.sql.functions.col

df.select(matrixColumns.map(col(_)):_*)
  • Related