i'm new to scala ,my requirement is delete the particular column records from almost 100 tables,so that i read the data from csv (which is my source) ,selected that particular column and changed into List.
val csvDF = spark.read.format("csv").option("header", "true").option("delimiter", ",").option("inferSchema", true).option("escape", "\"").option("multiline", "true").option("quotes", "").load(inputPath)
val badrecods = csvDF.select("corrput_id").collect().map(_ (0)).toList
then read the metadata from postgres schema, there will get the all the tables list ,here i write the two for loops which is working fine,but performance wat too bad,how can i imporve this
val query = "(select table_name from information_schema.tables where table_schema = '" db "' and table_name not in " excludetables ") temp "
val tablesdf = spark.read.jdbc(jdbcUrl, table = query, connectionProperties)
val tablelist = tablesdf.select($"corrput_id").collect().map(_(0)).toList
println(tablelist)
for (i <- tablelist) {
val s2 = dbconnection.createStatement()
for (j <- bad_records) {
s2.execute("delete from " db "." i " where corrput_id = '" j "' ")
}
s2.close()
Thanks in advance
CodePudding user response:
If you're looking to improve your performance, in my opinion, I think you should consider more on optimizing your queries instead! executing a query per row in a table WILL affect your performance, something like
" where corrput_id IN " bad_records.map(str => s" '$str' ").mkString("(", ",", ")")
would be better. The second point, why don't you just use spark APIs? I mean like using collect
on a DF and then processing it in a single thread is kind of like awaiting a Future (I mean you are not using the actual power that you can), spark is made to do such things, and can do these efficiently I believe.