I have a problem similar to one in Spark java.lang.NullPointerException Error when filter spark data frame on inside foreach iterator
String_Lines.foreachRDD{line ->
line.foreach{x ->
// JSON to DF Example
val sparkConfig = SparkConf().setAppName("JavaKinesisWordCountASL").setMaster("local[*]").
set("spark.sql.warehouse.dir", "file:///C:/tmp")
val spark = SparkSession.builder().config(sparkConfig).orCreate
val outer_jsonData = Arrays.asList(x)
val outer_anotherPeopleDataset = spark.createDataset(outer_jsonData, Encoders.STRING())
spark.read().json(outer_anotherPeopleDataset).createOrReplaceTempView("jsonInnerView")
spark.sql("select name, address.city, address.state from jsonInnerView").show(false)
println("Current String #" x)
}
}
@thebluephantom did explain it to the point. I have my code in foreachRDD now, but still it doesn't work. This is Kotlin and I am running it in my local laptop with IntelliJ. Somehow it's not picking sparksession as I understand after reading all blogs. If I delete "spark.read and spark.sql", everything else works OK. What should I do to fix this?
CodePudding user response:
If I delete "spark.read and spark.sql", everything else works OK
If you delete those, you're not actually making Spark do anything, only defining what Spark actions should happen (Spark actions are lazy)
Somehow it's not picking sparksession as I understand
It's "picking it up" just fine. The error is happening because it's picking up a brand new SparkSession. You should already have defined one of these outside of the forEachRDD method, but if you try to reuse it, you might run into different issues
Assuming String_Lines
is already a Dataframe. There's no point in looping over all of its RDD data and trying to create brand new SparkSession. Or if it's a DStream, convert it to Streaming Dataframe instead...
That being said, you should be able to immediately select data from it
// unclear what the schema of this is
val selected = String_Lines.selectExpr("name", "address.city", "address.state")
selected.show(false)
You may need to add a get_json_object
function in there if you're trying to parse strings to JSON
CodePudding user response:
I am able to solve it finally. I modified code like this.... Its clean and working.
This is String_Lines data type
val String_Lines: JavaDStream<String>
String_Lines.foreachRDD { x ->
val df = spark.read().json(x)
df.printSchema()
df.show(2,false)
}
Thanks, Chandra