Home > database >  Why Spark DataSet loses all its schema and just returning byte[]?
Why Spark DataSet loses all its schema and just returning byte[]?

Time:10-08

I create my SparkSession and register kryo classes this way:

val sparkConf = new SparkConf()
    .setAppName("bd-dq-spark")
    .set("spark.sql.adaptive.enabled", "true")
    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .set("spark.kryo.registrationRequired", "true")
    .set("spark.driver.host", "127.0.0.1")
    .registerKryoClasses(Array(classOf[HeatSensorEvent], Class.forName("scala.Enumeration$Val"), Class.forName("cs.spark_implicits.Model$EventType$")))
val spark: SparkSession = 
   SparkSession.builder()
     .master("local[*]")
     .config(sparkConf)
     .getOrCreate()

I define my case class this way:

object Model {
  type Timestamp = Long
  case class HeatSensorEvent(
                              eventId: String,
                              sensorId: String,
                              deviceId: String,
                              eventType: EventType,
                              timestamp: Timestamp,
                              temperature: Double
                            )
  object EventType extends Enumeration {
    final type EventType = Value
    val TEMPERATURE_CHANGE: EventType.Value = Value
  }
}

I prepare my fake data this way:

  val heatSensorEventData = Seq(
    HeatSensorEvent("123", "s1", "d1", TEMPERATURE_CHANGE, 1619555389, Double.box(85.41)),
    HeatSensorEvent("234", "s1", "d1", TEMPERATURE_CHANGE, 1619555419, Double.box(60.41)),
    HeatSensorEvent("345", "s1", "d1", TEMPERATURE_CHANGE, 1619556389, Double.box(60.41)),
    HeatSensorEvent("567", "s1", "d1", TEMPERATURE_CHANGE, 1619557389, Double.box(50.41))
  )

and my main is this:

def main(args: Array[String]): Unit = {
    implicit val heatSensorEventEncoder: Encoder[HeatSensorEvent] = org.apache.spark.sql.Encoders.kryo[HeatSensorEvent]
    implicit val eventTypeEncoder: Encoder[EventType] = org.apache.spark.sql.Encoders.kryo[EventType.EventType]
    val heatSensorEventDs: Dataset[HeatSensorEvent] = spark
      .createDataset(heatSensorEventData).as[HeatSensorEvent]
    heatSensorEventDs.show
    heatSensorEventDs.printSchema()
}

But all I got is this:

 -------------------- 
|               value|
 -------------------- 
|[27 01 01 64 B1 0...|
|[27 01 01 64 B1 0...|
|[27 01 01 64 B1 0...|
|[27 01 01 64 B1 0...|
 -------------------- 

root
 |-- value: binary (nullable = true)

My question is why I lose all the schema and I can't show the normal data? How can I fix this?

CodePudding user response:

When using encoders with objects, the columns can be transformed into a single binary column, which makes it impossible to inspect the values with a dataset.show()

See the approaches how to solve this, which was originated from this post (Unfortunately, this is an http link).

Define your classes:

type Timestamp = Long  
object Events {
  sealed case class EventType(value: String)
  object TEMPERATURE_CHANGE extends EventType("TEMPERATURE_CHANGE")
  val values: Array[EventType] = Array(TEMPERATURE_CHANGE)
}

case class HeatSensorEvent(
                            eventId: String,
                            sensorId: String,
                            deviceId: String,
                            eventType: Events.EventType,
                            timestamp: Timestamp,
                            temperature: Double
                          )

Create your data:

val heatSensorEventData = Seq(
  HeatSensorEvent("123", "s1", "d1", Events.TEMPERATURE_CHANGE, 1619555389, Double.box(85.41)),
  HeatSensorEvent("234", "s1", "d1", Events.TEMPERATURE_CHANGE, 1619555419, Double.box(60.41)),
  HeatSensorEvent("345", "s1", "d1", Events.TEMPERATURE_CHANGE, 1619556389, Double.box(60.41)),
  HeatSensorEvent("567", "s1", "d1", Events.TEMPERATURE_CHANGE, 1619557389, Double.box(50.41))
)

Now you can see your dataset:

val ds = heatSensorEventData.toDS()
ds.show()

Output:

 ------- -------- -------- -------------------- ---------- ----------- 
|eventId|sensorId|deviceId|           eventType| timestamp|temperature|
 ------- -------- -------- -------------------- ---------- ----------- 
|    123|      s1|      d1|[TEMPERATURE_CHANGE]|1619555389|      85.41|
|    234|      s1|      d1|[TEMPERATURE_CHANGE]|1619555419|      60.41|
|    345|      s1|      d1|[TEMPERATURE_CHANGE]|1619556389|      60.41|
|    567|      s1|      d1|[TEMPERATURE_CHANGE]|1619557389|      50.41|
 ------- -------- -------- -------------------- ---------- ----------- 
ds: org.apache.spark.sql.Dataset[HeatSensorEvent] = [eventId: string, sensorId: string ... 4 more fields]

Using enums in spark has been requested, and closed without a fix. The advantage in this, is that you don't need to use custom encoders.

  • Related