I've created a dataframe from reading a csv. Now i wanted to rename the columns and datatypes as per mentioned in the case class MyData.
case class MyData(
id: String,
name: String,
pcode: Integer,
loc: String)
val inputDF=spark.read.csv("/examples/example.csv")
inputDF.printSchema
scala> inputDF.printSchema
root
|-- _c0: string (nullable = true)
|-- _c1: string (nullable = true)
|-- _c2: string (nullable = true)
|-- _c3: string (nullable = true)
is there any way I can change the schema of inputDF to the one given in case class , want to rename the column as well as modify the datatypes as well.
Is there any way I can rename the columns and datatypes of inputDF to the one given in the case class. _c0 renamed to id, _c1 renamed to name,_c2 renamed to pcode ,_c3 renamed to loc plus the datatypes formatted according to case class. Can anyone help me with this.
CodePudding user response:
As described in this answer, you can first obtain a Spark schema from the case class:
import org.apache.spark.sql.Encoders
val mySchema = Encoders.product[MyData].schema
// scala> mySchema.printTreeString
// root
// |-- id: string (nullable = true)
// |-- name: string (nullable = true)
// |-- pcode: integer (nullable = true)
// |-- loc: string (nullable = true)
then supply it while reading the CSV file. Use as[MyData]
to obtain a typed dataset:
val inputDF = spark.read
.schema(mySchema)
.csv("/examples/example.csv")
.as[MyData]
Even without as[MyData]
, the untyped dataframe will still have all the correctly-named columns with their respective types:
scala> val inputDF = spark.read.schema(mySchema).csv("/tmp/example.csv")
inputDF: org.apache.spark.sql.DataFrame = [id: string, name: string ... 2 more fields]
scala> inputDF.printSchema
root
|-- id: string (nullable = true)
|-- name: string (nullable = true)
|-- pcode: integer (nullable = true)
|-- loc: string (nullable = true)
scala> val typedInputDF = inputDF.as[MyData]
typedInputDF: org.apache.spark.sql.Dataset[MyData] = [id: string, name: string ... 2 more fields]
scala> typedInputDF.printSchema
root
|-- id: string (nullable = true)
|-- name: string (nullable = true)
|-- pcode: integer (nullable = true)
|-- loc: string (nullable = true)