Home > Mobile >  How should I pass a Spark SQL DataFrame as an argument in Python function?
How should I pass a Spark SQL DataFrame as an argument in Python function?

Time:12-09

I have a Spark SQL DataFrame within Jupyter notebook as output_df1. I want to define a function as follows:

def output_agg(output_table_1):
    output_agg_1 = spark.sql(f"""
    select * from {output_table_1}
    """)
    return output_agg_1

When I call output_agg(output_df1), I get the following error:

Py4JJavaError                             Traceback (most recent call last)
/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py in deco(*a, **kw)
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:

/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:

Py4JJavaError: An error occurred while calling o110.sql.
: org.apache.spark.sql.catalyst.parser.ParseException: 
mismatched input '['  expecting <EOF>

Can you please help with the correct syntax?

CodePudding user response:

spark sql select needs to give table/temporary table. Register the dataframe as a temporary table first, and then execute the SQL statement.

output_df1.createOrReplaceTempView('output_table')
def output_agg(output_table_1):
    output_agg_1 = spark.sql(f"""
    select * from {output_table_1}
    """)
    return output_agg_1
output_agg('output_table')

T cannot add comment to completed questions. The information for learning Spark SQL is the Spark SQL Reference. https://spark.apache.org/docs/latest/sql-ref.html

CodePudding user response:

Print the SQL query before passing to spark.sql and check if SQL query looks good. Also, share the SQL query in question.

def output_agg(output_table_1):
    query = f"""select * from {output_table_1}"""
    print(query)
    output_agg_1 = spark.sql(query)
    return output_agg_1

If the SQL query looks good, then possible issues can be table is not registered with spark.

As discussed in the comments, since you want to union multiple dfs, you can do something like this

from functools import reduce
from pyspark.sql import DataFrame

dfs_list = [output_df1, output_df2, output_df3, output_df4]
df_combined = reduce(DataFrame.unionAll, dfs_list)

NOTE: Make sure the columns in all the df are in same order

  • Related