I am trying to read synapse table, which has spaces in column names.
Read table is working until I am selecting columns without spaces or special characters:
%%spark
val df = spark.read.synapsesql("<Pool>.<schema>.<table>").select("TYPE", "Year").limit(100)
df.show()
OUTPUT:
------ ----
| TYPE|Year|
------ ----
|BOUGHT|LAST|
|BOUGHT|LAST|
|BOUGHT|LAST|
|BOUGHT|LAST|
When I start selecting columns with spaces I am getting errors. I have tried many variants:
.select(col("""`Country Code`"""))
.select(col("`Country Code`"))
.select(col("""[Country Code]"""))
.select(col("Country Code"))
.select($"`Country Code`")
.select("`Country Code`")
.select("""`Country Code`""")
will return this error: ERROR: com.microsoft.sqlserver.jdbc.SQLServerException: Invalid column name 'Country'.
If I ommit `
in select for example:
.select("[Country Code]")
ERROR: com.microsoft.sqlserver.jdbc.SQLServerException: Invalid column name '[Country Code]'.
With back-tick spark in synapse just take only first word as column.
Any experience?
CodePudding user response:
The select
on its own will work, adding show
(or any other action like count
) will not. There does seem to be an issue with the Synapse synapsesql
API. The Invalid column name 'country' error is coming from the SQL engine because it seems like there is no way to pass square brackets back to it. Also parquet files do not support spaces in column names so it's probably connected.
The workaround is to simply not use spaces in column names. Fix up the tables in a previous Synapse pipeline step if required. I'll have a look into it but may be no other answer.
If you want to rename existing columns in the database you can use sp_rename
, eg
EXEC sp_rename 'dbo.countries.country Type', 'countryType', 'COLUMN';
This code has been tested on a Synapse dedicated SQL pool.
That particular API (sysnapsesql.read
) cannot handle views unfortunately. You would have to materialise it eg using a CTAS in a prior Synapse Pipeline step. The API useful for simple patterns (get table -> process -> put back) but is pretty limited. You can't even manage table distribution (hash, round_robin, replicate) or indexing (clustered columnstore, clustered index, heap) or partitioning but you never know they might add to it one day. I'll be keeping on eye out during the next MS conference anyway.
CodePudding user response:
I have created the function to run query using JDBC
. Thanks this I was able to read from view. I have added saplme code how to get password from KeyVault, using TokenLibrary
.
def spark_query(db, query):
jdbc_hostname = "<synapse_db>.sql.azuresynapse.net"
user = "<spark_db_client>"
password = "<strong_password>"
# password_from_kv = TokenLibrary.getSecret("<Linked_Key_Vault_Service_Name>", "<Key_Vault_Key_Name>", "<Key_Vault_Name>")
return spark.read.format("jdbc") \
.option("url", f"jdbc:sqlserver://{jdbc_hostname }:1433;databaseName={db};user={user};password={password}") \
.option("query", query) \
.load()
Then I have created VIEW
with column names without spaces:
CREATE VIEW v_my_table
AS
SELECT [Country code] as country_code from my_table
Granted access to <spark_db_client>
:
GRANT SELECT ON v_my_table to <spark_db_client>
After whole preparation I was able to read table from VIEW and save to spark
database:
query = """
SELECT country_code FROM dbo.v_my_table
"""
df = spark_query(db="<my_database>", query=query)
spark.sql("CREATE DATABASE IF NOT EXISTS spark_poc")
df.write.mode("overwrite").saveAsTable("spark_poc.my_table")
df.registerTempTable("my_table")
This are <placeholder_variables>