Home > Back-end >  Date validation function scala
Date validation function scala

Time:08-12

I have RDD[(String, String)]. String contains datetimestamp in format ("yyyy-MM-dd HH:mm:ss"). I am converting it in epoch time using the below function where dateFormats is SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

def epochTime (stringOfTime: String): Long = dateFormats.parse(stringOfTime).getTime

I want to modify the function so as to delete the row if it contains null/empty/not-right formatted date and how to apply it to the RDD[(String, String)] so the string value get converted to epoch time as below

Input

(2020-10-10 05:17:12,2015-04-10 09:18:20)

(2020-10-12 06:15:58,2015-04-10 09:17:42)

(2020-10-11 07:16:40,2015-04-10 09:17:49)

Output

(1602303432,1428653900)

(1602479758,1428653862)

(1602397000,1428653869)

CodePudding user response:

You can use a filter to determine which value is not None. To do this you need to change the epochTime method so that it can return Option[Long],def epochTime (stringOfTime: String): Option[Long] inside your method make a check to see if the string is null with the .nonEmpty method, then you can use Try to see if you can parse the string with dateFormats.

After these changes, you must filter the RDD to remove None, and then unwrap each value from Option to Long

The code itself:

    val sparkSession = SparkSession.builder()
      .appName("Data Validation")
      .master("local[*]")
      .getOrCreate()

    val data = Seq(("2020-10-10 05:17:12","2015-04-10 09:18:20"), ("2020-10-12 06:15:58","2015-04-10 09:17:42"),
      ("2020-10-11 07:16:40","2015-04-10 09:17:49"), ("t", "t"))

    val rdd:RDD[(String,String)] = sparkSession.sparkContext.parallelize(data)

    def epochTime (stringOfTime: String): Option[Long] = {
      val dateFormats = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
      if (stringOfTime.nonEmpty) {
        val parseDate = Try(dateFormats.parse(stringOfTime).getTime)
        parseDate match {
          case Success(value) => Some(value)
          case _ => None
        }
      } else {
        None
      }
    }
    rdd.map(pair => (epochTime(pair._1),epochTime(pair._2)))
      .filter(pair => pair._1.isDefined && pair._2.isDefined)
      .map(pair => (pair._1.get, pair._2.get))
      .foreach(pair => println(s"Results: (${pair._1}, ${pair._2})"))}
  • Related