I have this case class like this:
case class Data(a: String, b: String, c: String);
and this dataset like this:
val dataset: Dataset<SomeDataset>;
and function inside companion object (to prevent task not serializable exception)
object MyObj {
def doSomething(value: SomeDataset, data: Data //instance of case class) {...}
}
I would like to do something like this:
val data = Data(...) //instance of case class
dataset.map { doSomething(_, data) }
After this I am getting Task not serializable exception from spark. If i remove second argument from doSomething function it works find.
I tried even to make Data case class extends Serializable interface and it still does not work. Like this:
case class Data(a: String, b: String, c: String) extends Serializable
How do i make this working?
CodePudding user response:
One of the differences between case classes and classes in Scala is that case classes extend Serializable interface out of the box:
scala> case class FirstClass()
// defined case class FirstClass
scala> val f = FirstClass()
val f: FirstClass = FirstClass()
scala> f.isInstanceOf[Serializable]
val res1: Boolean = true
scala> class SecondClass
// defined class SecondClass
scala> val s = new SecondClass()
val s: SecondClass = SecondClass@y978y4f
scala> s.isInstanceOf[Serializable]
val res2: Boolean = false
So now, spark can take care of serializing your object through nodes (instances of your case classes), but you're also trying to do some operation on them. Spark needs to serialize your operation as well, since it needs to be done on different nodes. Now this post might help you find out some scenarios that TaskNotSerializableException
could happen, my guess is that doSomething
is a method, so spark cannot serialize it. So it might help if you could define it as a function:
object MyObj {
val doSomething: (SomeDataset, Data) => SomeOtherData = {...}
}