Home > Software design >  Cast Spark dataframe’s schema
Cast Spark dataframe’s schema

Time:11-01

I have a dataframe with the following schema:

StructType currentSchema = new StructType(new StructField[]{
    new StructField("age", DataTypes.StringType, false, Metadata.empty()),
    new StructField("grade", DataTypes.StringType, false, Metadata.empty()),
    new StructField("dateOfBirth", DataTypes.StringType, false, Metadata.empty())
});

And I want to convert it at once (without specifying each column) to the following schema:

StructType newSchema = new StructType(new StructField[]{
     new StructField("age", DataTypes.IntegerType, false, Metadata.empty()),
     new StructField("grade", DataTypes.IntegerType, false, Metadata.empty()),
     new StructField("dateOfBirth", DataTypes.DateType, false, Metadata.empty())
});

Is there any way to do such df.convert(newSchema) operation?

CodePudding user response:

Since DataFrame are immutable you have to create new DataFrame to change the schema. To do so, do one of the following methods:

I:

 Dataset<Row> ndf = df.select(col("age").cast(DataTypes.IntegerType),
                              col("grade").cast(DataTypes.IntegerType),
                              col("dateOfBirth").cast(DataTypes.DateType));        
 ndf.printSchema();

II:

or (I did just for age column):

Dataset<Row> ndf = df.withColumn("new_age", df.col("age").cast(DataTypes.IntegerType)).drop("age");
ndf.printSchema();

III:

at last but not least, use a map function to do your operation and change type simultaneously:

Dataset<Row> df2 = df.map(new MapFunction<Row, Row>() {
            @Override
            public Row call(Row row) throws Exception {
                return RowFactory.create((int)row.getString(0),
                                         (int)row.getString(1),
                                         (date)row.getString(2));
            }
        }, RowEncoder.apply(newSchema));

df2.printSchema();

In this method if cast (int) not worked, use Integer.Parse instead of.

CodePudding user response:

One way to go at it would be to ask spark to cast all your columns to the new type you expect. I am not sure it would work with all types of conversions, but it works for many cases:

List<Column> columns = Arrays
    .stream(newSchema.fields())
    .map(field -> col(field.name()).cast(field.dataType()))
    .collect(Collectors.toList());

Dataset<Row> newResult = result.select(zz.toArray(new Column[0]));

Another way to go at it would be to rely on the way spark applies schemas to csv files but that would require writing your data on the disk so I don't recommend that option.

result.write().csv("somewhere");
Dataset<Row> newResult = spark.read().schema(newSchema).csv("somewhere");
  • Related