How do I write the below code in typesafe manner in spark scala with Dataset Api:
val schema: StructType = Encoders.product[CaseClass].schema
//read json from a file
val readAsDataSet :CaseClass=sparkSession.read.option("mode",mode).schema(schema).json(path)as[CaseClass]
//below code needs to be written in type safe way:
val someDF= readAsDataSet.withColumn("col1",explode(col("col_to_be_exploded")))
.select(from_unixtime(col("timestamp").divide(1000))
.as("date"), col("col1"))
CodePudding user response:
As someone in the comments said, you can create a Dataset[CaseClass]
and do your operations on there. Let's set it up:
import spark.implicits._
case class MyTest (timestamp: Long, col_explode: Seq[String])
val df = Seq(
MyTest(1673850366000L, Seq("some", "strings", "here")),
MyTest(1271850365998L, Seq("pasta", "with", "cream")),
MyTest(611850366000L, Seq("tasty", "food"))
).toDF("timestamp", "col_explode").as[MyTest]
df.show(false)
------------- ---------------------
|timestamp |col_explode |
------------- ---------------------
|1673850366000|[some, strings, here]|
|1271850365998|[pasta, with, cream] |
|611850366000 |[tasty, food] |
------------- ---------------------
Typically, you can do many operations with the map
function and the Scala language.
A map
function returns the same amount of elements as the input has. The explode
function that you're using, however, does not return the same amount of elements. You can implement this behaviour using the flatMap
function.
So, using the Scala language and the flatMap
function together, you can do something like this:
import java.time.LocalDateTime
import java.time.ZoneOffset
case class Exploded (datetime: String, exploded: String)
val output = df.flatMap{ case MyTest(timestamp, col_explode) =>
col_explode.map( value => {
val date = LocalDateTime.ofEpochSecond(timestamp/1000, 0, ZoneOffset.UTC).toString
Exploded(date, value)
}
)
}
output.show(false)
------------------- --------
|datetime |exploded|
------------------- --------
|2023-01-16T06:26:06|some |
|2023-01-16T06:26:06|strings |
|2023-01-16T06:26:06|here |
|2010-04-21T11:46:05|pasta |
|2010-04-21T11:46:05|with |
|2010-04-21T11:46:05|cream |
|1989-05-22T14:26:06|tasty |
|1989-05-22T14:26:06|food |
------------------- --------
As you see, we've created a second case class called Exploded
which we use to type our output
dataset. Our output
dataset has the following type: org.apache.spark.sql.Dataset[Exploded]
so everything is completely type safe.