Home > other >  Update spark schema by converting to dataset
Update spark schema by converting to dataset

Time:10-21

I would like to update the schema of an spark dataframe by first converting it to a dataset which contains less columns. Background: i would like to remove some deeply nested fields from a schema.

I tried the following but the schema does not change:

import org.apache.spark.sql.functions._

val initial_df = spark.range(10).withColumn("foo", lit("foo!")).withColumn("bar", lit("bar!"))

case class myCaseClass(bar: String)

val reduced_ds = initial_df.as[myCaseClass]

The schema still includes the other fields:

reduced_ds.schema // StructType(StructField(id,LongType,false),StructField(foo,StringType,false),StructField(bar,StringType,false))

Is there a way to update the schema that way?`

It also confuses me that when i collect the dataset it only returns the fields defined in the case class:

reduced_ds.limit(1).collect() // Array(myCaseClass(bar!))

CodePudding user response:

in the doc: https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/Dataset.html#as[U](implicitevidence$2:org.apache.spark.sql.Encoder[U]):org.apache.spark.sql.Dataset[U] it says:

Note that as[] only changes the view of the data that is passed into typed operations, such as map(), and does not eagerly project away any columns that are not present in the specified class.

To achieve what you want to do you need to

initial_df.select(the columns in myCaseClass).as[myCaseClass]

It is normal since when u collect reduced_ds it returns record of Type myCaseClass, myCaseClass has only one attribute named bar. That's not conflicting with the fact that the dataset schema is something else

CodePudding user response:

Add a fake map operation to force the projection using the predefined identity function:

import org.apache.spark.sql.functions._

val initial_df = spark.range(10).withColumn("foo", lit("foo!")).withColumn("bar", lit("bar!"))

case class myCaseClass(bar: String)

val reduced_ds = initial_df.as[myCaseClass].map(identity)

This yields

reduced_ds.schema // StructType(StructField(bar,StringType,true))
  • Related