Home > Enterprise >  Execute Spark sql query within withColumn clause is Spark Scala
Execute Spark sql query within withColumn clause is Spark Scala


I have a dataframe which has one of the column called "Query" having the select statement present. Want to execute this query and create a new column having actual results from the TempView.

 -------------- ----------- ----- ---------------------------------------- 
|DIFFCOLUMNNAME|DATATYPE   |ISSUE|QUERY                                   |
 -------------- ----------- ----- ---------------------------------------- 
|Firstname     |StringType |YES  |Select Firstname from TempView  limit 1 |
|LastName      |StringType |NO   |Select LastName from TempView  limit 1  |
|Designation   |StringType |YES  |Select Designation from TempView limit 1|
|Salary        |IntegerType|YES  |Select Salary from TempView    limit 1  |
 -------------- ----------- ----- ---------------------------------------- 

Getting error as Type mismatch, Required String found column. Do I need to use UDF here. But not sure how to write and use. Please suggest

DF.withColumn("QueryResult", spark.sql(col("QUERY")))

TempView is Temporary View which I have created having all the required columns. Expected final Dataframe will be something like this with the new column added QUERYRESULT.

 -------------- ----------- ----- ---------------------------------------- ------------ 
|DIFFCOLUMNNAME|DATATYPE   |ISSUE|QUERY                                   | QUERY RESULT
 -------------- ----------- ----- ---------------------------------------- ------------ 
|Firstname     |StringType |YES  |Select Firstname from TempView  limit 1 | Bunny      |
|LastName      |StringType |NO   |Select LastName from TempView  limit 1  | Gummy      |
|Designation   |StringType |YES  |Select Designation from TempView limit 1| Developer  |
|Salary        |IntegerType|YES  |Select Salary from TempView    limit 1  | 100        |
 -------------- ----------- ----- ---------------------------------------- ------------ 

CodePudding user response:

If number of queries is limited, you can collect them, execute each, and join with original queries dataframe (Kieran was faster with his answer, but mine answer has example):

val queriesDF = Seq(
  ("Firstname", "StringType", "YES", "Select Firstname from TempView  limit 1 "),
  ("LastName", "StringType", "NO", "Select LastName from TempView  limit 1 "),
  ("Designation", "StringType", "YES", "Select Designation from TempView limit 1"),
  ("Salary", "IntegerType", "YES", "Select Salary from TempView limit 1 ")
val data = Seq(
  ("Bunny", "Gummy", "Developer", 100)
  .toDF("Firstname", "LastName", "Designation", "Salary")


// get all queries and evaluate results
val queries = queriesDF.select("QUERY").distinct().as(Encoders.STRING).collect().toSeq
val queryResults = queries.map(q => (q, spark.sql(q).as(Encoders.STRING).first()))
val queryResultsDF = queryResults.toDF("QUERY", "QUERY RESULT")

// Join original queries and results
  .join(queryResultsDF, Seq("QUERY"))
  .select("queriesDF.*", "QUERY RESULT")


 ---------------------------------------- -------------- ----------- ----- ------------ 
|QUERY                                   |DIFFCOLUMNNAME|DATATYPE   |ISSUE|QUERY RESULT|
 ---------------------------------------- -------------- ----------- ----- ------------ 
|Select Firstname from TempView  limit 1 |Firstname     |StringType |YES  |Bunny       |
|Select LastName from TempView  limit 1  |LastName      |StringType |NO   |Gummy       |
|Select Designation from TempView limit 1|Designation   |StringType |YES  |Developer   |
|Select Salary from TempView limit 1     |Salary        |IntegerType|YES  |100         |
 ---------------------------------------- -------------- ----------- ----- ------------ 

CodePudding user response:

Assuming you don't have that many 'query rows', just collect the results to driver using df.collect() and then map over queries using plain Scala.

  • Related