I have a dataframe with the following schema:
StructType currentSchema = new StructType(new StructField[]{
new StructField("age", DataTypes.StringType, false, Metadata.empty()),
new StructField("grade", DataTypes.StringType, false, Metadata.empty()),
new StructField("dateOfBirth", DataTypes.StringType, false, Metadata.empty())
});
And I want to convert it at once (without specifying each column) to the following schema:
StructType newSchema = new StructType(new StructField[]{
new StructField("age", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("grade", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("dateOfBirth", DataTypes.DateType, false, Metadata.empty())
});
Is there any way to do such df.convert(newSchema)
operation?
CodePudding user response:
Since DataFrame are immutable you have to create new DataFrame to change the schema. To do so, do one of the following methods:
I:
Dataset<Row> ndf = df.select(col("age").cast(DataTypes.IntegerType),
col("grade").cast(DataTypes.IntegerType),
col("dateOfBirth").cast(DataTypes.DateType));
ndf.printSchema();
II:
or (I did just for age column):
Dataset<Row> ndf = df.withColumn("new_age", df.col("age").cast(DataTypes.IntegerType)).drop("age");
ndf.printSchema();
III:
at last but not least, use a map function to do your operation and change type simultaneously:
Dataset<Row> df2 = df.map(new MapFunction<Row, Row>() {
@Override
public Row call(Row row) throws Exception {
return RowFactory.create((int)row.getString(0),
(int)row.getString(1),
(date)row.getString(2));
}
}, RowEncoder.apply(newSchema));
df2.printSchema();
In this method if cast (int)
not worked, use Integer.Parse
instead of.
CodePudding user response:
One way to go at it would be to ask spark to cast all your columns to the new type you expect. I am not sure it would work with all types of conversions, but it works for many cases:
List<Column> columns = Arrays
.stream(newSchema.fields())
.map(field -> col(field.name()).cast(field.dataType()))
.collect(Collectors.toList());
Dataset<Row> newResult = result.select(zz.toArray(new Column[0]));
Another way to go at it would be to rely on the way spark applies schemas to csv files but that would require writing your data on the disk so I don't recommend that option.
result.write().csv("somewhere");
Dataset<Row> newResult = spark.read().schema(newSchema).csv("somewhere");