Home > OS >  Write transformation in typesafe way
Write transformation in typesafe way

Time:01-17

How do I write the below code in typesafe manner in spark scala with Dataset Api:

val schema: StructType = Encoders.product[CaseClass].schema
//read json from a file
val readAsDataSet :CaseClass=sparkSession.read.option("mode",mode).schema(schema).json(path)as[CaseClass]

//below code needs to be written in type safe way:
val someDF= readAsDataSet.withColumn("col1",explode(col("col_to_be_exploded")))
      .select(from_unixtime(col("timestamp").divide(1000))
        .as("date"), col("col1"))

CodePudding user response:

As someone in the comments said, you can create a Dataset[CaseClass] and do your operations on there. Let's set it up:

import spark.implicits._

case class MyTest (timestamp: Long, col_explode: Seq[String])

val df = Seq(
  MyTest(1673850366000L, Seq("some", "strings", "here")),
  MyTest(1271850365998L, Seq("pasta", "with", "cream")),
  MyTest(611850366000L, Seq("tasty", "food"))
).toDF("timestamp", "col_explode").as[MyTest]


df.show(false)
 ------------- --------------------- 
|timestamp    |col_explode          |
 ------------- --------------------- 
|1673850366000|[some, strings, here]|
|1271850365998|[pasta, with, cream] |
|611850366000 |[tasty, food]        |
 ------------- --------------------- 

Typically, you can do many operations with the map function and the Scala language.

A map function returns the same amount of elements as the input has. The explode function that you're using, however, does not return the same amount of elements. You can implement this behaviour using the flatMap function.

So, using the Scala language and the flatMap function together, you can do something like this:

import java.time.LocalDateTime
import java.time.ZoneOffset

case class Exploded (datetime: String, exploded: String)

val output = df.flatMap{ case MyTest(timestamp, col_explode) =>
  col_explode.map( value => {
      val date = LocalDateTime.ofEpochSecond(timestamp/1000, 0, ZoneOffset.UTC).toString
      Exploded(date, value)
    }
  )
}

output.show(false)
 ------------------- -------- 
|datetime           |exploded|
 ------------------- -------- 
|2023-01-16T06:26:06|some    |
|2023-01-16T06:26:06|strings |
|2023-01-16T06:26:06|here    |
|2010-04-21T11:46:05|pasta   |
|2010-04-21T11:46:05|with    |
|2010-04-21T11:46:05|cream   |
|1989-05-22T14:26:06|tasty   |
|1989-05-22T14:26:06|food    |
 ------------------- -------- 

As you see, we've created a second case class called Exploded which we use to type our output dataset. Our output dataset has the following type: org.apache.spark.sql.Dataset[Exploded] so everything is completely type safe.

  • Related