I have a Spark SQL DataFrame within Jupyter notebook as output_df1
. I want to define a function as follows:
def output_agg(output_table_1):
output_agg_1 = spark.sql(f"""
select * from {output_table_1}
""")
return output_agg_1
When I call output_agg(output_df1)
, I get the following error:
Py4JJavaError Traceback (most recent call last)
/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py in deco(*a, **kw)
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
Py4JJavaError: An error occurred while calling o110.sql.
: org.apache.spark.sql.catalyst.parser.ParseException:
mismatched input '[' expecting <EOF>
Can you please help with the correct syntax?
CodePudding user response:
spark sql select
needs to give table/temporary table. Register the dataframe as a temporary table first, and then execute the SQL statement.
output_df1.createOrReplaceTempView('output_table')
def output_agg(output_table_1):
output_agg_1 = spark.sql(f"""
select * from {output_table_1}
""")
return output_agg_1
output_agg('output_table')
T cannot add comment to completed questions. The information for learning Spark SQL is the Spark SQL Reference. https://spark.apache.org/docs/latest/sql-ref.html
CodePudding user response:
Print the SQL query before passing to spark.sql
and check if SQL query looks good. Also, share the SQL query in question.
def output_agg(output_table_1):
query = f"""select * from {output_table_1}"""
print(query)
output_agg_1 = spark.sql(query)
return output_agg_1
If the SQL query looks good, then possible issues can be table is not registered with spark.
As discussed in the comments, since you want to union multiple dfs, you can do something like this
from functools import reduce
from pyspark.sql import DataFrame
dfs_list = [output_df1, output_df2, output_df3, output_df4]
df_combined = reduce(DataFrame.unionAll, dfs_list)
NOTE: Make sure the columns in all the df are in same order