Home > Mobile >  How to write a function that runs certain SQL on certain columns in a PySpark dataframe?
How to write a function that runs certain SQL on certain columns in a PySpark dataframe?

Time:07-14

enter image description here

I wrote some code and have this as output. The left side is basically the columns of a dataframe that I'm working with, and the right side is the SQL query that needs to be run on that particular column.

Now I want to write a function that runs the queries on the right on the columns on the left and display the output.

The first picture is basically the values of the 'Column' and 'Query' columns of another dataframe. I used .collect() methods to retrieve those values.

enter image description here

This seemed like a simple problem but I'm still stuck at it. Any idea how to do it?

CodePudding user response:

You can put column names and queries to a dictionary:

dct = {'column_name': 'SELECT * FROM table WHERE {col} IS NULL'}

for k, v in dct.items():
    q = v.format(col = k)
    # spark.sql(q)
    print(q)

Output:

SELECT * FROM table WHERE column_name IS NULL

CodePudding user response:

Using a subset of your data

data_ls = [
    ('maxpulse', 'select count(*) from {table} where {col} is null'),
    ('duration', 'select round((count(distinct {col}) / count({col})) * 100) from {table}')
]

data_sdf = spark.sparkContext.parallelize(data_ls).toDF(['column', 'query'])

#  -------- ----------------------------------------------------------------------- 
# |column  |query                                                                  |
#  -------- ----------------------------------------------------------------------- 
# |maxpulse|select count(*) from {table} where {col} is null                       |
# |duration|select round((count(distinct {col}) / count({col})) * 100) from {table}|
#  -------- ----------------------------------------------------------------------- 

Approach 1: Using UDF

def createQuery(query_string, column_name=None, table_name=None):
    if column_name is not None and table_name is None:
        fnlquery = query_string.format(col=column_name, table='{table}')
    elif column_name is None and table_name is not None:
        fnlquery = query_string.format(col='{col}', table=table_name)
    elif column_name is not None and table_name is not None:
        fnlquery = query_string.format(col=column_name, table=table_name)
    else:
        fnlquery = query_string

    return fnlquery

createQueryUDF = func.udf(createQuery, StringType())

data_sdf. \
    withColumn('final_query', createQueryUDF('query', 'column')). \
    select('final_query'). \
    show(truncate=False)

#  ----------------------------------------------------------------------------- 
# |final_query                                                                  |
#  ----------------------------------------------------------------------------- 
# |select count(*) from {table} where maxpulse is null                          |
# |select round((count(distinct duration) / count(duration)) * 100) from {table}|
#  ----------------------------------------------------------------------------- 

Approach 2: Using regexp_replace() sql function

data_sdf. \
    withColumn('final_query', func.expr('regexp_replace(query, "[\{]col[\}]", column)')). \
    select('final_query'). \
    show(truncate=False)

#  ----------------------------------------------------------------------------- 
# |final_query                                                                  |
#  ----------------------------------------------------------------------------- 
# |select count(*) from {table} where maxpulse is null                          |
# |select round((count(distinct duration) / count(duration)) * 100) from {table}|
#  ----------------------------------------------------------------------------- 

The final queries from the final_query field can then be collected (using .collect()) and used further to run sql queries. Similar approach can be used to replace '{table}' with a table name.

  • Related