I create my SparkSession and register kryo classes this way:
val sparkConf = new SparkConf()
.setAppName("bd-dq-spark")
.set("spark.sql.adaptive.enabled", "true")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrationRequired", "true")
.set("spark.driver.host", "127.0.0.1")
.registerKryoClasses(Array(classOf[HeatSensorEvent], Class.forName("scala.Enumeration$Val"), Class.forName("cs.spark_implicits.Model$EventType$")))
val spark: SparkSession =
SparkSession.builder()
.master("local[*]")
.config(sparkConf)
.getOrCreate()
I define my case class this way:
object Model {
type Timestamp = Long
case class HeatSensorEvent(
eventId: String,
sensorId: String,
deviceId: String,
eventType: EventType,
timestamp: Timestamp,
temperature: Double
)
object EventType extends Enumeration {
final type EventType = Value
val TEMPERATURE_CHANGE: EventType.Value = Value
}
}
I prepare my fake data this way:
val heatSensorEventData = Seq(
HeatSensorEvent("123", "s1", "d1", TEMPERATURE_CHANGE, 1619555389, Double.box(85.41)),
HeatSensorEvent("234", "s1", "d1", TEMPERATURE_CHANGE, 1619555419, Double.box(60.41)),
HeatSensorEvent("345", "s1", "d1", TEMPERATURE_CHANGE, 1619556389, Double.box(60.41)),
HeatSensorEvent("567", "s1", "d1", TEMPERATURE_CHANGE, 1619557389, Double.box(50.41))
)
and my main is this:
def main(args: Array[String]): Unit = {
implicit val heatSensorEventEncoder: Encoder[HeatSensorEvent] = org.apache.spark.sql.Encoders.kryo[HeatSensorEvent]
implicit val eventTypeEncoder: Encoder[EventType] = org.apache.spark.sql.Encoders.kryo[EventType.EventType]
val heatSensorEventDs: Dataset[HeatSensorEvent] = spark
.createDataset(heatSensorEventData).as[HeatSensorEvent]
heatSensorEventDs.show
heatSensorEventDs.printSchema()
}
But all I got is this:
--------------------
| value|
--------------------
|[27 01 01 64 B1 0...|
|[27 01 01 64 B1 0...|
|[27 01 01 64 B1 0...|
|[27 01 01 64 B1 0...|
--------------------
root
|-- value: binary (nullable = true)
My question is why I lose all the schema and I can't show the normal data? How can I fix this?
CodePudding user response:
When using encoders with objects, the columns can be transformed into a single binary column, which makes it impossible to inspect the values with a dataset.show()
See the approaches how to solve this, which was originated from this post (Unfortunately, this is an http link).
Define your classes:
type Timestamp = Long
object Events {
sealed case class EventType(value: String)
object TEMPERATURE_CHANGE extends EventType("TEMPERATURE_CHANGE")
val values: Array[EventType] = Array(TEMPERATURE_CHANGE)
}
case class HeatSensorEvent(
eventId: String,
sensorId: String,
deviceId: String,
eventType: Events.EventType,
timestamp: Timestamp,
temperature: Double
)
Create your data:
val heatSensorEventData = Seq(
HeatSensorEvent("123", "s1", "d1", Events.TEMPERATURE_CHANGE, 1619555389, Double.box(85.41)),
HeatSensorEvent("234", "s1", "d1", Events.TEMPERATURE_CHANGE, 1619555419, Double.box(60.41)),
HeatSensorEvent("345", "s1", "d1", Events.TEMPERATURE_CHANGE, 1619556389, Double.box(60.41)),
HeatSensorEvent("567", "s1", "d1", Events.TEMPERATURE_CHANGE, 1619557389, Double.box(50.41))
)
Now you can see your dataset:
val ds = heatSensorEventData.toDS()
ds.show()
Output:
------- -------- -------- -------------------- ---------- -----------
|eventId|sensorId|deviceId| eventType| timestamp|temperature|
------- -------- -------- -------------------- ---------- -----------
| 123| s1| d1|[TEMPERATURE_CHANGE]|1619555389| 85.41|
| 234| s1| d1|[TEMPERATURE_CHANGE]|1619555419| 60.41|
| 345| s1| d1|[TEMPERATURE_CHANGE]|1619556389| 60.41|
| 567| s1| d1|[TEMPERATURE_CHANGE]|1619557389| 50.41|
------- -------- -------- -------------------- ---------- -----------
ds: org.apache.spark.sql.Dataset[HeatSensorEvent] = [eventId: string, sensorId: string ... 4 more fields]
Using enums in spark has been requested, and closed without a fix. The advantage in this, is that you don't need to use custom encoders.