I hope to process various generic data sources accessed by Kafka
, so I developed the following code:
Since the commented out code will get an error: class type required but t found, I modified the code, but caused a new problem: type mismatch; found : _$ 1 where type _$ 1 required: T。 How should my needs be realized?
CodePudding user response:
That's because runtimeClass
returns a Class[_]
, not a Class[T]
. This kind of approach would make perfect sense in Java, like:
JSONUtils.toBean(whateverString, MyClass.class); // and so on
In Scala, (of course there are unsafe approaches to make this work) but if you're using some JSON library (like Play Json, circe, etc,.), you can do this:
// method signature would look something like this
def accessKafkaSource[T : Reads](sEnv: StreamExecutionEnvironment): DataStream[T] = {
// just going to write the deserialization part:
override def deserialize(msg: Array[Byte]): T = {
Json.parse(msg).as[T] // asOpt is safer, not going to get into exception handling and other stuff
}
}
This behavior is also applicable to other json libraries in scala. Or if you have other kind of documents like xml, expect an implicit function from Array[Byte] => DocumentType (Like JsValue, String, Xml, anything)
, and another one DocumentType => T
. Because this function accessKafkaSource
should not be responsible to figure out how data should be deserialized/serialized.
CodePudding user response:
As AminMal notes, runtimeClass
is not guaranteed to return the class object of T
, just what it erases to in the runtime. AnyVal
s in particular will break this.
If everything you wish to deserialize is an AnyRef
(this is likely the case), you can often safely cast the result of runtimeClass
:
def kindaSafeClass[T <: AnyRef : ClassTag]: Class[T] = classTag[T].runtimeClass.asInstanceOf[Class[T]]
The situation this would be unsafe is when generics are involved (erasure...), as can be seen by
val clazz = kindaSafeClass[List[String]]
val lst = List(1)
val cast =
if (clazz.isInstance(lst)) {
println(s"$lst is an instance of $clazz")
clazz.cast(lst)
} else ???
println(cast)
println(cast.head.isEmpty)
which will print List(1) is an instance of class scala.collection.immutable.List
, then List(1)
, and then blow up with a ClassCastException
when we try to cast 1
to a String
.
But if your T
will always be an AnyRef
and you can be sure that it's not generic, you can
// Note: T must not be generic (e.g. List[String])
def accessKafkaSource[T <: AnyRef : ClassTag](sEnv: StreamExecutionEnvironment): DataStream[T] =
// as before until...
JSONUtils.toBean(StrUtil.str(msg, StandardCharsets.UTF_8), kindaSafeClass[T])
// as before...