Home > Software design >  Spark - SparkSession access issue
Spark - SparkSession access issue

Time:02-27

I have a problem similar to one in Spark java.lang.NullPointerException Error when filter spark data frame on inside foreach iterator

    String_Lines.foreachRDD{line ->
            line.foreach{x ->
                // JSON to DF Example
                val sparkConfig = SparkConf().setAppName("JavaKinesisWordCountASL").setMaster("local[*]").
                                  set("spark.sql.warehouse.dir", "file:///C:/tmp")
                val spark = SparkSession.builder().config(sparkConfig).orCreate

                val outer_jsonData = Arrays.asList(x)
                val outer_anotherPeopleDataset = spark.createDataset(outer_jsonData, Encoders.STRING())
                spark.read().json(outer_anotherPeopleDataset).createOrReplaceTempView("jsonInnerView")
                spark.sql("select name, address.city, address.state from jsonInnerView").show(false)
                println("Current String #"  x)
        }
    }

@thebluephantom did explain it to the point. I have my code in foreachRDD now, but still it doesn't work. This is Kotlin and I am running it in my local laptop with IntelliJ. Somehow it's not picking sparksession as I understand after reading all blogs. If I delete "spark.read and spark.sql", everything else works OK. What should I do to fix this?

CodePudding user response:

If I delete "spark.read and spark.sql", everything else works OK

If you delete those, you're not actually making Spark do anything, only defining what Spark actions should happen (Spark actions are lazy)

Somehow it's not picking sparksession as I understand

It's "picking it up" just fine. The error is happening because it's picking up a brand new SparkSession. You should already have defined one of these outside of the forEachRDD method, but if you try to reuse it, you might run into different issues


Assuming String_Lines is already a Dataframe. There's no point in looping over all of its RDD data and trying to create brand new SparkSession. Or if it's a DStream, convert it to Streaming Dataframe instead...

That being said, you should be able to immediately select data from it

// unclear what the schema of this is 
val selected = String_Lines.selectExpr("name", "address.city", "address.state")
selected.show(false)

You may need to add a get_json_object function in there if you're trying to parse strings to JSON

CodePudding user response:

I am able to solve it finally. I modified code like this.... Its clean and working.

This is String_Lines data type

val String_Lines: JavaDStream<String>
        String_Lines.foreachRDD { x ->
            val df = spark.read().json(x)
            df.printSchema()
            df.show(2,false)
        }

Thanks, Chandra

  • Related