def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local")
.appName("SparkAndHive")
.config("spark.sql.warehouse.dir", "/tmp/spark-warehouse 2")
.enableHiveSupport()
.getOrCreate()
GeoSparkSQLRegistrator.registerAll(spark.sqlContext)
val sparkConf: SparkConf = new SparkConf().setAppName("Spark RDD foreach Example").setMaster("local[2]").set("spark.executor.memory", "2g")
def displayFiles(files: Array[File], a: util.List[String], b: util.List[String]): Unit = {
for (filename <- files) { // If a sub directory is found,
if (filename.isDirectory) if (filename.getName.contains("fire")) {
rds.add(filename.getAbsolutePath)
println(filename.getAbsolutePath)
}
else if (filename.getName.contains("water")){
rdd.add(filename.getAbsolutePath)
println(filename.getAbsolutePath)
}
else {
displayFiles(filename.listFiles, a, b)
}
}
}
val files = new File("C://folder").listFiles
val list1 = new util.ArrayList[String]
val list2 = new util.ArrayList[String]
displayFiles(files, list1, list2)
val a= Seq(list1)
println(a)
val b= Seq(list2)
println(b)
val rdd1 = spark.sparkContext.parallelize(Seq(a))
rdd1.foreach(rrd)
val rdd2 = spark.sparkContext.parallelize(Seq(a))
rdd1.foreach(rrd2)
val dfSeq1 = Seq(rdd1)
println(dfSeq1)
val mergeSeqDf1 = dfSeq1.reduce(_ union _)
mergeSeqDf1.show()
val dfSeq2 = Seq(rdd2)
println(dfSeq2)
val mergeSeqDf2 = dfSeq2.reduce(_ union _)
mergeSeqDf2.show()
I have created a list that has sub folders path that contains "fire" list looks like List("C//1_fire", "C//2_fire", "C//3_fire")
and created other list that has sub folders path that contains "water" list looks like List("C//1_water", "C//2_water", "C//3_water")
I have created RDD for the list and printed then it showed List("C//1_fire", "C//2_fire", "C//3_fire") for fire and List("C//1_water", "C//2_water", "C//3_water") for water.
Then I merged all the fire RDD's in rdd1 and all the water RDD's in rdd2 but I am getting error for show as "value show is not a member of org.apache.spark.rdd.RDD[java.util.ArrayList[String]] mergeSeqDf1.show()"
How to convert RDD to data frame to show the dataframe
Structure of data frame
>
>>person1
>>>a_fire
>>>>a_fire
>>>>>1_fire
>>>>>2_fire
>>>>>3_fire
>>>>>4_fire
>>>>a_water
>>>>>1_water
>>>>>2_water
>>>>>3_fire
>>>>>4_fire
>>person2
>>>b_fire
>>>>b_fire
>>>>>1_fire
>>>>>2_fire
>>>>>3_fire
>>>>>4_fire
>>>>b_water
>>>>>1_water
>>>>>2_water
>>>>>3_fire
>>>>>4_fire
CodePudding user response:
Spark has 3 major concepts - RDD
, DataSet
and DataFrame
.
So lets say you had a simple list of tuples
// list of tuple (String, String)
// these tupele are contain id and name of people
val list: List[(String, String)] =
List(
("1", "abc"),
("2", "def")
)
RDD
API's are simplest to get and are avialable with SparkContext
. You just need to have spark-core
as a dependency in your project.
import org.apache.spark.{SparkConf, SparkContext}
val conf = new SparkConf().setAppName("appName").setMaster("local[*]")
// people generally use `sc` variable to refer to `SparkContext`
val sc = new SparkContext(conf)
val rdd: RDD[(String, String)] = sc.parallelize(list)
For DataSet
and DataFrame
you will also need to add spark-sql
as a dependency in your project. And SparkContext
is not enough, you will need a SparkSession
.
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
// people generally use `spark` variable to refer to `SparkSession`
val spark = SparkSession.builder().master("local[*]").appName("CustomerAddress").getOrCreate()
// you can get the SparkContext from SparkSession
val sc = spark.sparkContext
// then you import the implicits required for working with DataSet API
import spark.implicits._
// rdd of tuple (String, String)
val rdd: RDD[(String, String)] = sc.parallelize(list)
// you can get a DataSet of tuple (String, String)
val ds1: Dataset[(String, String)] = rdd.toDS()
ds1.show()
// --- ---
//| _1| _2|
// --- ---
//| 1|abc|
//| 2|def|
// --- ---
Now, DataFrame
is actually just another name for a DataSet[Row]
, where Row
is another Spark data strcuture which contains columns.
// convert to df without giving specific column names
// the Rows will use the tuple index as column names
val df1: DataFrame = rdd.toDF()
df1.show()
// --- ---
//| _1| _2|
// --- ---
//| 1|abc|
//| 2|def|
// --- ---
// remember DataFrame is jut a name for DataSet[Row]
val df11: Dataset[Row] = rdd.toDF()
df11.show()
// --- ---
//| _1| _2|
// --- ---
//| 1|abc|
//| 2|def|
// --- ---
But, you can provide column names as well
val df2: DataFrame = rdd.toDF("id", "name")
df2.show()
// --- ----
//| id|name|
// --- ----
//| 1| abc|
//| 2| def|
// --- ----
Instead of working with a DataFrame
(which is a DataSet[Row]
), you can also use your domain specific data strcuture.
case class Person(id: String, name: String)
val ds2: Dataset[Person] = rdd.map(t => Person(t._1, t._2)).toDS()
ds2.show()
// --- ----
//| id|name|
// --- ----
//| 1| abc|
//| 2| def|
// --- ----