Home > database >  PySpark write to Trino with JDBC connector
PySpark write to Trino with JDBC connector

Time:05-10

I am trying to read data from a table in Trino using a JDBC connector with PySpark, however, I keep getting the error "Unknown type 'TEXT' for column X" whenever I try to write a string / VARCHAR column. It works fine with double, for example.

The code I'm using is the following:

from pyspark.sql import SparkSession
from pyspark import SparkContext

sc = SparkContext("local", "Test Write")

spark = SparkSession\
    .builder\
    .config("spark.jars", "trino-jdbc-379.jar")\
    .master("local")\
    .appName("Test Write")\
    .getOrCreate()


query_test = """( SELECT CAST(name AS VARCHAR(200)) AS brandname
                    FROM dbname.pdo.brand) brand """
test_df = spark \
    .read\
    .format("jdbc")\
    .option("url", "jdbc:trino://host:443")\
    .option("driver", "io.trino.jdbc.TrinoDriver")\
    .option("dbtable", query_test)\
    .option("user", "user")\
    .option("password", "pass")\
    .load()

test_df.printSchema()

test_df\
    .write\
    .format("jdbc")\
    .option("url", "jdbc:trino://host:443")\
    .option("dbtable", "dbname.sandbox.test")\
    .option("isolationLevel","NONE")\
    .option("user", "user")\
    .option("password", "pass")\
    .mode("overwrite")\
    .save()

And the full error I get is this:

Traceback (most recent call last):
  File "/test_write.py", line 28, in <module>
    test_df\
  File "/home/user/.local/lib/python3.8/site-packages/pyspark/sql/readwriter.py", line 828, in save
    self._jwrite.save()
  File "/home/user/.local/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/home/user/.local/lib/python3.8/site-packages/pyspark/sql/utils.py", line 128, in deco
    return f(*a, **kw)
  File "/home/user/.local/lib/python3.8/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o51.save.
: java.sql.SQLException: Query failed (#20220509_144401_00574_zadvu): line 1:36: Unknown type 'TEXT' for column '"brandname"'
        at io.trino.jdbc.AbstractTrinoResultSet.resultsException(AbstractTrinoResultSet.java:1908)
        at io.trino.jdbc.TrinoResultSet.getColumns(TrinoResultSet.java:285)
        at io.trino.jdbc.TrinoResultSet.create(TrinoResultSet.java:61)
        at io.trino.jdbc.TrinoStatement.internalExecute(TrinoStatement.java:262)
        at io.trino.jdbc.TrinoStatement.execute(TrinoStatement.java:240)
        at io.trino.jdbc.TrinoStatement.executeLargeUpdate(TrinoStatement.java:485)
        at io.trino.jdbc.TrinoStatement.executeUpdate(TrinoStatement.java:457)
        at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.createTable(JdbcUtils.scala:894)
        at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:81)
        at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:127)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:126)
        at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:962)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:767)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:962)
        at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:414)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:398)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
Caused by: io.trino.spi.TrinoException: line 1:36: Unknown type 'TEXT' for column '"brandname"'
        at io.trino.sql.analyzer.SemanticExceptions.semanticException(SemanticExceptions.java:48)
        at io.trino.sql.analyzer.SemanticExceptions.semanticException(SemanticExceptions.java:43)
        at io.trino.execution.CreateTableTask.internalExecute(CreateTableTask.java:160)
        at io.trino.execution.CreateTableTask.execute(CreateTableTask.java:119)
        at io.trino.execution.CreateTableTask.execute(CreateTableTask.java:85)
        at io.trino.execution.DataDefinitionExecution.start(DataDefinitionExecution.java:145)
        at io.trino.execution.SqlQueryManager.createQuery(SqlQueryManager.java:243)
        at io.trino.dispatcher.LocalDispatchQuery.lambda$startExecution$7(LocalDispatchQuery.java:143)
        at io.trino.$gen.Trino_375____20220508_190617_2.run(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)

I know the field is properly formated because the result from the test_df.printSchema() is this:

root
 |-- brandname: string (nullable = true)

And I can also print it with test_df.show() without issues.

Any ideas on how to get around this issue? I am using PySpark==3.0.3 as there is a known issue with versions past 3.1.* with the JDBC connector and it was only fixed on 3.3.0 but it's not yet published in a stable version.

CodePudding user response:

Add this option to your write:

.option("createTableColumnTypes", "brandname VARCHAR(200)")

The start of the Rabbit hole that leads to this being the likely answer: here and here

  • Related