Home > OS >  RDD to DataFrame in spark and scala
RDD to DataFrame in spark and scala

Time:10-20

def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .master("local")
      .appName("SparkAndHive")
      .config("spark.sql.warehouse.dir", "/tmp/spark-warehouse 2")
      .enableHiveSupport()
      .getOrCreate()

    GeoSparkSQLRegistrator.registerAll(spark.sqlContext)

      

    val sparkConf: SparkConf = new SparkConf().setAppName("Spark RDD foreach Example").setMaster("local[2]").set("spark.executor.memory", "2g")
   
    def displayFiles(files: Array[File], a: util.List[String], b: util.List[String]): Unit = { 
      for (filename <- files) { // If a sub directory is found,
      
        if (filename.isDirectory) if (filename.getName.contains("fire")) {
          rds.add(filename.getAbsolutePath)
          println(filename.getAbsolutePath)
        }
        else if (filename.getName.contains("water")){
          rdd.add(filename.getAbsolutePath)
          println(filename.getAbsolutePath)
        }
        else {                     
          displayFiles(filename.listFiles, a, b)
        }
      }
    }

    val files = new File("C://folder").listFiles

    val list1 = new util.ArrayList[String]
    val list2 = new util.ArrayList[String]

    displayFiles(files, list1, list2)

    val a= Seq(list1)
    println(a)
    val b= Seq(list2)
    println(b)

    val rdd1 = spark.sparkContext.parallelize(Seq(a))
    rdd1.foreach(rrd)
    val rdd2 = spark.sparkContext.parallelize(Seq(a))
    rdd1.foreach(rrd2)

        val dfSeq1 = Seq(rdd1)
        println(dfSeq1)
        val mergeSeqDf1 = dfSeq1.reduce(_ union _)
        mergeSeqDf1.show()

        val dfSeq2 = Seq(rdd2)
        println(dfSeq2)
        val mergeSeqDf2 = dfSeq2.reduce(_ union _)
        mergeSeqDf2.show()

I have created a list that has sub folders path that contains "fire" list looks like List("C//1_fire", "C//2_fire", "C//3_fire")

and created other list that has sub folders path that contains "water" list looks like List("C//1_water", "C//2_water", "C//3_water")

I have created RDD for the list and printed then it showed List("C//1_fire", "C//2_fire", "C//3_fire") for fire and List("C//1_water", "C//2_water", "C//3_water") for water.

Then I merged all the fire RDD's in rdd1 and all the water RDD's in rdd2 but I am getting error for show as "value show is not a member of org.apache.spark.rdd.RDD[java.util.ArrayList[String]] mergeSeqDf1.show()"

How to convert RDD to data frame to show the dataframe

Structure of data frame

>
  >>person1
    >>>a_fire
       >>>>a_fire
         >>>>>1_fire
         >>>>>2_fire
         >>>>>3_fire
         >>>>>4_fire
     >>>>a_water
         >>>>>1_water
         >>>>>2_water
         >>>>>3_fire
         >>>>>4_fire
  >>person2
    >>>b_fire
       >>>>b_fire
         >>>>>1_fire
         >>>>>2_fire
         >>>>>3_fire
         >>>>>4_fire
     >>>>b_water
         >>>>>1_water
         >>>>>2_water
         >>>>>3_fire
         >>>>>4_fire

CodePudding user response:

Spark has 3 major concepts - RDD, DataSet and DataFrame.

So lets say you had a simple list of tuples


// list of tuple (String, String)
// these tupele are contain id and name of people

val list: List[(String, String)] =
  List(
    ("1", "abc"),
    ("2", "def")
  )

RDD API's are simplest to get and are avialable with SparkContext. You just need to have spark-core as a dependency in your project.

import org.apache.spark.{SparkConf, SparkContext}

val conf = new SparkConf().setAppName("appName").setMaster("local[*]")

// people generally use `sc` variable to refer to `SparkContext`
val sc = new SparkContext(conf)

val rdd: RDD[(String, String)] = sc.parallelize(list)

For DataSet and DataFrame you will also need to add spark-sql as a dependency in your project. And SparkContext is not enough, you will need a SparkSession.

import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

// people generally use `spark` variable to refer to `SparkSession`
val spark = SparkSession.builder().master("local[*]").appName("CustomerAddress").getOrCreate()

// you can get the SparkContext from SparkSession
val sc = spark.sparkContext

// then you import the implicits required for working with DataSet API
import spark.implicits._

// rdd of tuple (String, String)
val rdd: RDD[(String, String)] = sc.parallelize(list)

// you can get a DataSet of tuple (String, String)
val ds1: Dataset[(String, String)] = rdd.toDS()

ds1.show()
// --- --- 
//| _1| _2|
// --- --- 
//|  1|abc|
//|  2|def|
// --- --- 

Now, DataFrame is actually just another name for a DataSet[Row], where Row is another Spark data strcuture which contains columns.

// convert to df without giving specific column names
// the Rows will use the tuple index as column names
val df1: DataFrame = rdd.toDF()

df1.show()
// --- --- 
//| _1| _2|
// --- --- 
//|  1|abc|
//|  2|def|
// --- --- 

// remember DataFrame is jut a name for DataSet[Row]
val df11: Dataset[Row] = rdd.toDF()

df11.show()
// --- --- 
//| _1| _2|
// --- --- 
//|  1|abc|
//|  2|def|
// --- --- 

But, you can provide column names as well

val df2: DataFrame = rdd.toDF("id", "name")

df2.show()
// --- ---- 
//| id|name|
// --- ---- 
//|  1| abc|
//|  2| def|
// --- ---- 

Instead of working with a DataFrame (which is a DataSet[Row]), you can also use your domain specific data strcuture.

case class Person(id: String, name: String)

val ds2: Dataset[Person] = rdd.map(t => Person(t._1, t._2)).toDS()

ds2.show()
// --- ---- 
//| id|name|
// --- ---- 
//|  1| abc|
//|  2| def|
// --- ---- 
  • Related