I am trying to add the configuraions to an active spark session. Below is my code
val spark = SparkSession.getActiveSession.get
spark.conf.set("spark.mongodb.input.uri",
"mongodb://hello_admin:hello123@localhost:27017/testdb.products?authSource=admin")
spark.conf.set("spark.mongodb.input.partitioner" ,"MongoPaginateBySizePartitioner")
import com.mongodb.spark._
val customRdd = MongoSpark.load(sc)
println(customRdd.count())
println(customRdd.first.toJson)
println(customRdd.collect().foreach(println))
But I am getting an error:
java.lang.IllegalArgumentException: Missing database name. Set via the 'spark.mongodb.input.uri' or 'spark.mongodb.input.database' property
While when I write the code
val spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.input.uri", "mongodb://hello_admin:hello123@localhost:27017/testdb.products?authSource=admin")
// .config("spark.mongodb.output.uri", "mongodb://hello_admin:hello123@localhost:27017/testdb.products?authSource=admin")
.config("spark.mongodb.input.partitioner" ,"MongoPaginateBySizePartitioner")
.getOrCreate()
val sc = spark.sparkContext
val customRdd = MongoSpark.load(sc)
println(customRdd.count())
println(customRdd.first.toJson)
println(customRdd.collect().foreach(println))
My code is excecuting fine.
Kindly let me know what changes i need in the first code
CodePudding user response:
You can define sparkSession like this with SparkConf. ( i don't know if this helps you )
def sparkSession(conf: SparkConf): SparkSession = SparkSession
.builder()
.config(conf)
.getOrCreate()
val sparkConf = new SparkConf()
sparkConf.set("prop","value")
val ss = sparkSession(sparkConf)
Or you can try to use SparkEnv ( i'm using sparkEnv for a lot of things to change props ):
SparkEnv.get.conf.set("prop", "value")