I have a dataframe which has one of the column called "Query" having the select statement present. Want to execute this query and create a new column having actual results from the TempView.
-------------- ----------- ----- ----------------------------------------
|DIFFCOLUMNNAME|DATATYPE |ISSUE|QUERY |
-------------- ----------- ----- ----------------------------------------
|Firstname |StringType |YES |Select Firstname from TempView limit 1 |
|LastName |StringType |NO |Select LastName from TempView limit 1 |
|Designation |StringType |YES |Select Designation from TempView limit 1|
|Salary |IntegerType|YES |Select Salary from TempView limit 1 |
-------------- ----------- ----- ----------------------------------------
Getting error as Type mismatch, Required String found column. Do I need to use UDF here. But not sure how to write and use. Please suggest
DF.withColumn("QueryResult", spark.sql(col("QUERY")))
TempView is Temporary View which I have created having all the required columns. Expected final Dataframe will be something like this with the new column added QUERYRESULT.
-------------- ----------- ----- ---------------------------------------- ------------
|DIFFCOLUMNNAME|DATATYPE |ISSUE|QUERY | QUERY RESULT
-------------- ----------- ----- ---------------------------------------- ------------
|Firstname |StringType |YES |Select Firstname from TempView limit 1 | Bunny |
|LastName |StringType |NO |Select LastName from TempView limit 1 | Gummy |
|Designation |StringType |YES |Select Designation from TempView limit 1| Developer |
|Salary |IntegerType|YES |Select Salary from TempView limit 1 | 100 |
-------------- ----------- ----- ---------------------------------------- ------------
CodePudding user response:
If number of queries is limited, you can collect them, execute each, and join with original queries dataframe (Kieran was faster with his answer, but mine answer has example):
val queriesDF = Seq(
("Firstname", "StringType", "YES", "Select Firstname from TempView limit 1 "),
("LastName", "StringType", "NO", "Select LastName from TempView limit 1 "),
("Designation", "StringType", "YES", "Select Designation from TempView limit 1"),
("Salary", "IntegerType", "YES", "Select Salary from TempView limit 1 ")
).toDF(
"DIFFCOLUMNNAME", "DATATYPE", "ISSUE", "QUERY"
)
val data = Seq(
("Bunny", "Gummy", "Developer", 100)
)
.toDF("Firstname", "LastName", "Designation", "Salary")
data.createOrReplaceTempView("TempView")
// get all queries and evaluate results
val queries = queriesDF.select("QUERY").distinct().as(Encoders.STRING).collect().toSeq
val queryResults = queries.map(q => (q, spark.sql(q).as(Encoders.STRING).first()))
val queryResultsDF = queryResults.toDF("QUERY", "QUERY RESULT")
// Join original queries and results
queriesDF.alias("queriesDF")
.join(queryResultsDF, Seq("QUERY"))
.select("queriesDF.*", "QUERY RESULT")
Output:
---------------------------------------- -------------- ----------- ----- ------------
|QUERY |DIFFCOLUMNNAME|DATATYPE |ISSUE|QUERY RESULT|
---------------------------------------- -------------- ----------- ----- ------------
|Select Firstname from TempView limit 1 |Firstname |StringType |YES |Bunny |
|Select LastName from TempView limit 1 |LastName |StringType |NO |Gummy |
|Select Designation from TempView limit 1|Designation |StringType |YES |Developer |
|Select Salary from TempView limit 1 |Salary |IntegerType|YES |100 |
---------------------------------------- -------------- ----------- ----- ------------
CodePudding user response:
Assuming you don't have that many 'query rows', just collect the results to driver using df.collect() and then map over queries using plain Scala.