I have a dataframe named timeDF
which has the schema below:
root
|-- Id: long (nullable = true)
|-- Model: timestamp (nullable = true)
|-- Prevision: timestamp (nullable = true)
I want to add a new row at the end of timeDF
by transforming two Calendar
objects c1
& c2
to Timestamp
. I know I can do it by first converting them to Timestamp
like so :
val t1 = new Timestamp(c1.getTimeInMillis)
val t2 = new Timestamp(c2.getTimeInMillis)
However, I can't figure out how I then write those variables to timeDF
as a new row, and how to let spark increase the Id
column value ?
Should I create a List
object with t1
and t2
and make a temporary dataframe from this list to then union the two dataframes ? If so how do I manage the Id
column ? Isn't it too much a mess for such a simple operation ?
Can someone explain me please ?
Thanks.
CodePudding user response:
If your first dataframe can be sorted by ID and you need to add rows one by one you can find maximum ID in your list:
long max = timeDF.agg(functions.max("Id")).head().getLong(0);
and then increment and add it to your dataframe by Union. To do this, follow the following example which age can act like id. people.json
is a file in spark examples.
Dataset<Row> df = spark.read().json("H:\\work\\HadoopWinUtils\\people.json");
df.show();
long max = df.agg(functions.max("age")).head().getLong(0);
List<Row> rows = Arrays.asList(RowFactory.create(max 1, "test"));
StructType schema = DataTypes.createStructType(Arrays.asList(
DataTypes.createStructField("age", DataTypes.LongType, false, Metadata.empty()),
DataTypes.createStructField("name", DataTypes.StringType, false, Metadata.empty())));
Dataset<Row> df2 = spark.createDataFrame(rows, schema);
df2.show();
Dataset<Row> df3 = df.union(df2);
df3.show();
CodePudding user response:
I tried this but I don't know why, when printing the table saved, it only keep the last 2 rows, all others being deleted.
This is how I init the delta table :
val schema = StructType(
StructField("Id", LongType, false) ::
StructField("Model", TimestampType, false) ::
StructField("Prevision", TimestampType, false) :: Nil
)
var timestampDF = spark.createDataFrame(sc.emptyRDD[Row], schema)
val write_format = "delta"
val partition_by = "Model"
val save_path = "/mnt/path/to/folder"
val table_name = "myTable"
spark.sql("DROP TABLE IF EXISTS " table_name)
dbutils.fs.rm(save_path, true)
timestampDF.write.partitionBy(partition_by)
.format(write_format)
.save(save_path)
spark.sql("CREATE TABLE " table_name " USING DELTA LOCATION '" save_path "'")
And this how I add a new item to it
def addTimeToData(model: Calendar, target: Calendar): Unit = {
var timeDF = spark.read
.format("delta")
.load("/mnt/path/to/folder")
val modelTS = new Timestamp(model.getTimeInMillis)
val targetTS = new Timestamp(target.getTimeInMillis)
var id: Long = 0
if (!timeDF.head(1).isEmpty) {
id = timeDF.agg(max("Id")).head().getLong(0) 1
}
val newTime = Arrays.asList(RowFactory.create(id, modelTS, targetTS))
val schema = StructType(
StructField("Id", LongType, false) ::
StructField("Model", TimestampType, false) ::
StructField("Prevision", TimestampType, false) :: Nil
)
var newTimeDF = spark.createDataFrame(newTime, schema)
val unionTimeDF = timeDF.union(newTimeDF)
timeDF = unionTimeDF
unionTimeDF.show
val save_path = "/mnt/datalake/Exploration/Provisionning/MeteoFrance/Timestamps/"
val table_name = "myTable"
spark.sql("DROP TABLE IF EXISTS " table_name)
dbutils.fs.rm(save_path, true)
timeDF.write.partitionBy("Model")
.format("delta")
.save(save_path)
spark.sql("CREATE TABLE " table_name " USING DELTA LOCATION '" save_path "'")
}
I'm not very familiar with delta tables so I don't know if I can just use SQL on it to add values like so :
spark.sql("INSERT INTO 'myTable' VALUES (" id ", " modelTS ", " previsionTS ")");
And I don't if just putting the timestamps variable like so will work.