I'm having some issues when trying to execute a class function inside a "dataframe.foreach" function. My custom class is persisting the data into a DynamoDB table.
What happens is that if I have the following code, it won't work and will raise a "Null Pointer Exception" that points to the line of code where the "writer.writeRow(r)" is executed:
object writeToDynamoDB extends App {
val df: DataFrame = ...
val writer: DynamoDBWriter = new DDBWriter(...)
df
.foreach(
r => writer.writeRow(r)
)
}
If I use the same code, but having the code inside a code block or an if clause, it will work:
object writeToDynamoDB extends App {
val df: DataFrame = ...
if(true) {
val writer: DynamoDBWriter = new DDBWriter(...)
df
.foreach(
r => writer.writeRow(r)
)
}
}
I guess it has something to do with the variable scope. Even in IntelliJ the color of the variable is purple Italic in the first case and "regular" grey in the second case. I read about it, and we have the method, field and local scope in Scala, but I'm can't relate that with what I'm trying to do.
Some questions after this introduction:
Can anyone explain why does Scala and/or Spark have this behaviour?
The solution here is to put some code inside a function, code block or a "fake" if clause as far as I know. Is there any possible issue regarding Spark properties (shuffles, etc)?
Is there any other way to do this type of operations?
Hope I was clear.
Thanks in advance.
Regards
CodePudding user response:
As said above, your issue is caused by delayed initialization when using the App
trait. Spark docs strongly discourage that:
Note that applications should define a main() method instead of extending scala.App. Subclasses of scala.App may not work correctly.
The reason can be found in the Javadocs of the App
trait itself:
It should be noted that this trait is implemented using the DelayedInit functionality, which means that fields of the object will not have been initialized before the main method has been executed.
This basically means that writer
is still uninitialized (so null
) by the time the closure passed to foreach
is created.
If you put respective code into a block, writer
becomes a local variable and is initialized at the time when the block is evaluated. That way your closure will contain the correct value of writer
. In this case it doesn't matter anymore when the code is evaluated, because everything get's evaluated together.
The correct and recommended solution is to use a standard main
method for your Spark applications:
object writeToDynamoDB {
def main(args: Array[String]): Unit = {
val df: DataFrame = ...
val writer: DynamoDBWriter = new DDBWriter(...)
df.foreach(r => writer.writeRow(r))
}
}