Home > Mobile >  How to run an RDD of SQL commands
How to run an RDD of SQL commands

Time:11-04

I have an RDD[String] of a whole lot of strings that look like "INSERT INTO hive_metastore.default.redirects VALUES (123,56),(589,32)(267,11)". I'd like to be able to run all of these commands to get the data into my actual table, instead of just having a bunch of strings with instructions to get them into the table. For context, I'm doing this on databricks, and I don't know enough to have set up any odd settings there. (I hope.)

At first I tried just doing insertIntoLines.foreach{ x => spark.sql(x) }, but that does not seem to work. It does, however, work if I insert a collect to get insertIntoLines.collect().foreach{ x => spark.sql(x) } - and that is fine for my toy data, but for the actual data, I really don't want to have to fit it all into memory on the driver.

Surely there is a nice and principled way of doing this, that doesn't either bottleneck hard at the driver or involve digging into the SQL commands with bespoke regexes?

CodePudding user response:

Can you try using Dataset or Dataframe instead of RDDs. ? Thereby you could avoid calling collect().

import org.apache.spark.sql._
case class SqlCommand(query:String)

val querySet = Seq(SqlCommand("query1"), SqlCommand("query2")).toDS()
querySet.foreach(query => spark.sql(query))

CodePudding user response:

Because the RDD is distributed and the closure may be executed on different machines you need to setup a spark session with the database access on each executor; however, I would consider alternative approaches such as some sort of bulk uploading with pre-processing because it seems like the distributed executors in your case would still query a single database which may be suboptimal in terms of scalability and performance. Also, please consider security risks such as SQL-injection.

The code example from a Spark shell below illustrates the issue with capturing the spark variable in a closure.

scala> val rdd = sc.parallelize(Seq("select 1", "select 2", "select 3"))
scala> println(spark)
org.apache.spark.sql.SparkSession@4d5500b
scala> rdd.foreach(_ => println(spark))
null
null
null
scala> rdd.foreach(_ => println(SparkSession.builder().getOrCreate()))
org.apache.spark.sql.SparkSession@4d5500b
org.apache.spark.sql.SparkSession@4d5500b
org.apache.spark.sql.SparkSession@4d5500b
scala> rdd.foreach(query => {
     |   // Setup the database access and other session settings.
     |   println(SparkSession.builder.getOrCreate().sql(query))
     | })
[3: int]
[2: int]
[1: int]

Please check the related Understanding closures and parallelism in Spark discussion on StackOverflow.

  • Related