I would like to update the schema of an spark dataframe by first converting it to a dataset which contains less columns. Background: i would like to remove some deeply nested fields from a schema.
I tried the following but the schema does not change:
import org.apache.spark.sql.functions._
val initial_df = spark.range(10).withColumn("foo", lit("foo!")).withColumn("bar", lit("bar!"))
case class myCaseClass(bar: String)
val reduced_ds = initial_df.as[myCaseClass]
The schema still includes the other fields:
reduced_ds.schema // StructType(StructField(id,LongType,false),StructField(foo,StringType,false),StructField(bar,StringType,false))
Is there a way to update the schema that way?`
It also confuses me that when i collect the dataset it only returns the fields defined in the case class:
reduced_ds.limit(1).collect() // Array(myCaseClass(bar!))
CodePudding user response:
in the doc: https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/Dataset.html#as[U](implicitevidence$2:org.apache.spark.sql.Encoder[U]):org.apache.spark.sql.Dataset[U] it says:
Note that as[] only changes the view of the data that is passed into typed operations, such as map(), and does not eagerly project away any columns that are not present in the specified class.
To achieve what you want to do you need to
initial_df.select(the columns in myCaseClass).as[myCaseClass]
It is normal since when u collect reduced_ds it returns record of Type myCaseClass, myCaseClass has only one attribute named bar. That's not conflicting with the fact that the dataset schema is something else
CodePudding user response:
Add a fake map
operation to force the projection using the predefined identity
function:
import org.apache.spark.sql.functions._
val initial_df = spark.range(10).withColumn("foo", lit("foo!")).withColumn("bar", lit("bar!"))
case class myCaseClass(bar: String)
val reduced_ds = initial_df.as[myCaseClass].map(identity)
This yields
reduced_ds.schema // StructType(StructField(bar,StringType,true))