Home > OS >  Add row to Spark Dataframe with timestamps and id
Add row to Spark Dataframe with timestamps and id

Time:10-28

I have a dataframe named timeDF which has the schema below:

root
 |-- Id: long (nullable = true)
 |-- Model: timestamp (nullable = true)
 |-- Prevision: timestamp (nullable = true)

I want to add a new row at the end of timeDF by transforming two Calendar objects c1 & c2 to Timestamp. I know I can do it by first converting them to Timestamp like so :

val t1 = new Timestamp(c1.getTimeInMillis)
val t2 = new Timestamp(c2.getTimeInMillis)

However, I can't figure out how I then write those variables to timeDF as a new row, and how to let spark increase the Id column value ?

Should I create a List object with t1 and t2 and make a temporary dataframe from this list to then union the two dataframes ? If so how do I manage the Id column ? Isn't it too much a mess for such a simple operation ?

Can someone explain me please ?

Thanks.

CodePudding user response:

If your first dataframe can be sorted by ID and you need to add rows one by one you can find maximum ID in your list:

long max = timeDF.agg(functions.max("Id")).head().getLong(0);

and then increment and add it to your dataframe by Union. To do this, follow the following example which age can act like id. people.json is a file in spark examples.

Dataset<Row> df = spark.read().json("H:\\work\\HadoopWinUtils\\people.json");
df.show();

long max = df.agg(functions.max("age")).head().getLong(0);
List<Row> rows = Arrays.asList(RowFactory.create(max 1,  "test"));

StructType schema = DataTypes.createStructType(Arrays.asList(
                DataTypes.createStructField("age", DataTypes.LongType, false, Metadata.empty()),
                DataTypes.createStructField("name", DataTypes.StringType, false, Metadata.empty())));
Dataset<Row> df2 = spark.createDataFrame(rows, schema);
df2.show();
Dataset<Row> df3 = df.union(df2);
df3.show();

CodePudding user response:

I tried this but I don't know why, when printing the table saved, it only keep the last 2 rows, all others being deleted.

This is how I init the delta table :

val schema = StructType(
               StructField("Id", LongType, false) ::
               StructField("Model", TimestampType, false) ::
               StructField("Prevision", TimestampType, false) :: Nil
             )

var timestampDF = spark.createDataFrame(sc.emptyRDD[Row], schema)

val write_format = "delta"
val partition_by = "Model"
val save_path = "/mnt/path/to/folder"
val table_name = "myTable"

spark.sql("DROP TABLE IF EXISTS "   table_name)
dbutils.fs.rm(save_path, true)

timestampDF.write.partitionBy(partition_by)
                 .format(write_format)
                 .save(save_path)

spark.sql("CREATE TABLE "   table_name   " USING DELTA LOCATION '"   save_path   "'")

And this how I add a new item to it

def addTimeToData(model: Calendar, target: Calendar): Unit = {
  var timeDF = spark.read
                    .format("delta")
                    .load("/mnt/path/to/folder")
  
  val modelTS = new Timestamp(model.getTimeInMillis)
  val targetTS = new Timestamp(target.getTimeInMillis)
  var id: Long = 0
  
  if (!timeDF.head(1).isEmpty) {
    id = timeDF.agg(max("Id")).head().getLong(0)   1
  }
  
  val newTime = Arrays.asList(RowFactory.create(id, modelTS, targetTS))
  val schema = StructType(
                 StructField("Id", LongType, false) ::
                 StructField("Model", TimestampType, false) ::
                 StructField("Prevision", TimestampType, false) :: Nil
               )  
  var newTimeDF = spark.createDataFrame(newTime, schema)
  val unionTimeDF = timeDF.union(newTimeDF)
  timeDF = unionTimeDF
  unionTimeDF.show
  val save_path = "/mnt/datalake/Exploration/Provisionning/MeteoFrance/Timestamps/"
  val table_name = "myTable"

  spark.sql("DROP TABLE IF EXISTS "   table_name)
  dbutils.fs.rm(save_path, true)
  timeDF.write.partitionBy("Model")
              .format("delta")
              .save(save_path)

  spark.sql("CREATE TABLE "   table_name   " USING DELTA LOCATION '"   save_path   "'")
}

I'm not very familiar with delta tables so I don't know if I can just use SQL on it to add values like so :

spark.sql("INSERT INTO 'myTable' VALUES ("   id   ", "   modelTS   ", "   previsionTS   ")");

And I don't if just putting the timestamps variable like so will work.

  • Related