I have a Pyspark DataFrame looking like this :
sdf1 = sc.parallelize([["toto", "tata", ["table", "column"], "SELECT {1} FROM {0}"], "titi", "tutu", ["table", "column"], "SELECT {1} FROM {0}"]]).toDF(["table", "column", "parameters", "statement"])
----- ------ --------------- -------------------
|table|column| parameters| statement|
----- ------ --------------- -------------------
| toto| tata|[table, column]|SELECT {1} FROM {0}|
| titi| tutu|[table, column]|SELECT {1} FROM {0}|
----- ------ --------------- -------------------
And I try to map the array "parameters" elements to columns, to finally format "statement" with values from columns.
This is what I expect after processing transformation :
sdf2 = sc.parallelize([["toto", "tata", ["table", "column"], "SELECT {1} FROM {0}", "SELECT tata FROM toto"],["titi", "tutu", ["table", "column"], "SELECT {1} FROM {0}", "SELECT tutu FROM titi"]]).toDF(["table", "column", "parameters", "statement", "result"])
----- ------ --------------- ------------------- ---------------------
|table|column| parameters| statement| result|
----- ------ --------------- ------------------- ---------------------
| toto| tata|[table, column]|SELECT {1} FROM {0}|SELECT tata FROM toto|
| titi| tutu|[table, column]|SELECT {1} FROM {0}|SELECT tutu FROM titi|
----- ------ --------------- ------------------- ---------------------
CodePudding user response:
An approach with RDD.
def addParamsToQuery(param_ls, query, r):
new_param_ls = [r[k] for k in param_ls]
new_query = query.format(*new_param_ls)
return new_query
columns = data_sdf.columns
data_sdf. \
rdd. \
map(lambda r: [r[c] for c in columns] [addParamsToQuery(r.parameters, r.statement, r)]). \
toDF(columns ['result']). \
show(truncate=False)
# ----- ------ --------------- ------------------- ---------------------
# |table|column|parameters |statement |result |
# ----- ------ --------------- ------------------- ---------------------
# |toto |tata |[table, column]|SELECT {1} FROM {0}|SELECT tata FROM toto|
# |titi |tutu |[table, column]|SELECT {1} FROM {0}|SELECT tutu FROM titi|
# ----- ------ --------------- ------------------- ---------------------
The function addParamsToQuery
creates the list of parameter values using the column values and inserts into the statement using .format()
.