I am a newbie in Scala. I want to read data from Oracle database in each Spark Node and convert it to Spark DataFrame. The code is in following:
def read_data(group_id: Int):String = {
val table_name = "table"
val col_name = "col"
val query =
""" select f1,f2,f3,f4,f5,f6,f7,f8
| from """.stripMargin table_name """ where MOD(TO_NUMBER(substr(""" col_name """, -LEAST(2, LENGTH(""" col_name """)))),""" num_node """)=""" group_id
val oracleUser = "ORCL"
val oraclePassword = "*******"
val oracleURL = "jdbc:oracle:thin:@//x.x.x.x:1521/ORCLDB"
val ods = new OracleDataSource()
ods.setUser(oracleUser)
ods.setURL(oracleURL)
ods.setPassword(oraclePassword)
val con = ods.getConnection()
val statement = con.createStatement()
statement.setFetchSize(1000) // important
val resultSet : java.sql.ResultSet = statement.executeQuery(query)
var ret = " "
while(resultSet.next()) {
for {i <- 1 until 8 by 1
ret = ret.concat(resultSet.getString(i))
ret = ret.concat(" ")
}yield(ret)
return ret
}
println("ret:",ret)
return ret
}
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("testScala")
.set("spark.executor.memory", "8g")
.set("spark.executor.cores", "2")
.set("spark.task.cpus","1")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(group_list,num_node)
.map(read_data).map(x => println(x)).count()
println("rdd:",rdd)
The part of the code that I have problem is in the following:
var ret = " "
while(resultSet.next()) {
for (i <- 1 until 8 by 1) {
ret = ret.concat(resultSet.getString(i))
ret = ret.concat(" ")
return ret
}
return ret
}
println("ret:",ret)
println("ret:",ret)
print null
string. When I change the code like this:
var ret = " "
while(resultSet.next()) {
for {i <- 1 until 8 by 1
ret = ret.concat(resultSet.getString(i))
ret = ret.concat(" ")
}yield(ret)
return ret
}
I receive this error:
ret is already defined as value ret
ret = ret.concat(" ")
In fact, before running, I see that code has problem with concat
:
Cannot resolve symbol concat
Would you please guide me how I can access result of while/for
outside them?
Any help is really appreciated.
CodePudding user response:
You can replace your code
var ret = " "
while(resultSet.next()) {
for {i <- 1 until 8 by 1
ret = ret.concat(resultSet.getString(i))
ret = ret.concat(" ")
}yield(ret)
return ret
}
by
val ret = Iterator.continually(resultSet)
.takeWhile(_.next)
.flatMap(r => (1 until 8).map(i => r.getString(i)))
.mkString(" ")
CodePudding user response:
You're using for-comprehension here. What you actually do here is creating a new val called ret
. What you write is evaluated as
for(i <- 1 until 8 by 1){
val ret = ret.concat(resultSet.getString(i))
val ret = ret.concat(" ")
} yield(ret)
What you can do instead is usage of
for {i <- 1 until 8 by 1
_ = ret = ret.concat(resultSet.getString(i))
_ = ret = ret.concat(" ")
} yield(ret)
CodePudding user response:
You're using for-comprehension here. What you actually do here is creating a new val called ret
. What you write is evaluated as
for(i <- 1 until 8 by 1){
val ret = ret.concat(resultSet.getString(i))
val ret = ret.concat(" ")
} yield(ret)
What you can do instead is usage of
for {i <- 1 until 8 by 1
_ = ret = ret.concat(resultSet.getString(i))
_ = ret = ret.concat(" ")
} yield(ret)
Or to simplify the whole loop replace it with the following (I'm not sure what's your intent with that code, I assume you want to have whole concanated string ret
, however by usage of yield
I would assume you also want the intermediate steps of this process)
val ret = new StringBuilder(" ")
var steps: Seq[String] = Nil
while(resultSet.next()) {
steps = {
for (i <- 1 until 8 by 1) {
ret = ret.append(resultSet.getString(i)).append(" ")
} yield(ret.toString)
}
}