After some transformations, I want to save a spark dataset to a parquet table using insertInto.
ds.write.mode(SaveMode.Overwrite).insertInto(tablename)
But the operation fails, giving me this error:
[TABLENAME] requires that the data to be inserted have the same number of columns as the target table: target table has 11 column(s) but the inserted data has 19 column(s)
The correct number of columns is 11 and is the number of attributes in the case class I used to build the dataset but when I convert it to a dataframe and shows the columns a lot of temporary columns used for the transformation (additionnal columns from join etc...) are still present, causing the failure of the saving process.
The workaround I found is to load the table data and select its column :
val tableSchema = spark.table(tablename).schema
val dfWithCorrectColumns = ds.select(tableSchema.fieldNames.map(col) : _*)
But this solution seems hacky. I understood that saveAsTable would be an option to have the schema enforcing mechanism but that is not recommended either because of the lack of dynamic partition overwriting.
Is there an easy way to "truncate" the dataset to its definition, enforcing its schema?
CodePudding user response:
If you have used a case class A
to create a value ds
of type Dataset[A]
then you can truncate it to the fields you need with the following:
val ds_clean: Dataset[A] = ds.map(identity)