I am trying to read data from a table in Trino using a JDBC connector with PySpark, however, I keep getting the error "Unknown type 'TEXT' for column X"
whenever I try to write a string / VARCHAR column. It works fine with double, for example.
The code I'm using is the following:
from pyspark.sql import SparkSession
from pyspark import SparkContext
sc = SparkContext("local", "Test Write")
spark = SparkSession\
.builder\
.config("spark.jars", "trino-jdbc-379.jar")\
.master("local")\
.appName("Test Write")\
.getOrCreate()
query_test = """( SELECT CAST(name AS VARCHAR(200)) AS brandname
FROM dbname.pdo.brand) brand """
test_df = spark \
.read\
.format("jdbc")\
.option("url", "jdbc:trino://host:443")\
.option("driver", "io.trino.jdbc.TrinoDriver")\
.option("dbtable", query_test)\
.option("user", "user")\
.option("password", "pass")\
.load()
test_df.printSchema()
test_df\
.write\
.format("jdbc")\
.option("url", "jdbc:trino://host:443")\
.option("dbtable", "dbname.sandbox.test")\
.option("isolationLevel","NONE")\
.option("user", "user")\
.option("password", "pass")\
.mode("overwrite")\
.save()
And the full error I get is this:
Traceback (most recent call last):
File "/test_write.py", line 28, in <module>
test_df\
File "/home/user/.local/lib/python3.8/site-packages/pyspark/sql/readwriter.py", line 828, in save
self._jwrite.save()
File "/home/user/.local/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
return_value = get_return_value(
File "/home/user/.local/lib/python3.8/site-packages/pyspark/sql/utils.py", line 128, in deco
return f(*a, **kw)
File "/home/user/.local/lib/python3.8/site-packages/py4j/protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o51.save.
: java.sql.SQLException: Query failed (#20220509_144401_00574_zadvu): line 1:36: Unknown type 'TEXT' for column '"brandname"'
at io.trino.jdbc.AbstractTrinoResultSet.resultsException(AbstractTrinoResultSet.java:1908)
at io.trino.jdbc.TrinoResultSet.getColumns(TrinoResultSet.java:285)
at io.trino.jdbc.TrinoResultSet.create(TrinoResultSet.java:61)
at io.trino.jdbc.TrinoStatement.internalExecute(TrinoStatement.java:262)
at io.trino.jdbc.TrinoStatement.execute(TrinoStatement.java:240)
at io.trino.jdbc.TrinoStatement.executeLargeUpdate(TrinoStatement.java:485)
at io.trino.jdbc.TrinoStatement.executeUpdate(TrinoStatement.java:457)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.createTable(JdbcUtils.scala:894)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:81)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:126)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:962)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:767)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:962)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:414)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:398)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: io.trino.spi.TrinoException: line 1:36: Unknown type 'TEXT' for column '"brandname"'
at io.trino.sql.analyzer.SemanticExceptions.semanticException(SemanticExceptions.java:48)
at io.trino.sql.analyzer.SemanticExceptions.semanticException(SemanticExceptions.java:43)
at io.trino.execution.CreateTableTask.internalExecute(CreateTableTask.java:160)
at io.trino.execution.CreateTableTask.execute(CreateTableTask.java:119)
at io.trino.execution.CreateTableTask.execute(CreateTableTask.java:85)
at io.trino.execution.DataDefinitionExecution.start(DataDefinitionExecution.java:145)
at io.trino.execution.SqlQueryManager.createQuery(SqlQueryManager.java:243)
at io.trino.dispatcher.LocalDispatchQuery.lambda$startExecution$7(LocalDispatchQuery.java:143)
at io.trino.$gen.Trino_375____20220508_190617_2.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
I know the field is properly formated because the result from the test_df.printSchema()
is this:
root
|-- brandname: string (nullable = true)
And I can also print it with test_df.show()
without issues.
Any ideas on how to get around this issue? I am using PySpark==3.0.3 as there is a known issue with versions past 3.1.* with the JDBC connector and it was only fixed on 3.3.0 but it's not yet published in a stable version.
CodePudding user response:
Add this option to your write:
.option("createTableColumnTypes", "brandname VARCHAR(200)")
The start of the Rabbit hole that leads to this being the likely answer: here and here