Home > front end >  Reading SQLite database in Apache Spark on Databricks: Unsupported Type NULL
Reading SQLite database in Apache Spark on Databricks: Unsupported Type NULL

Time:03-16

I have a SQLite database I want to import into Spark on DataBricks.

When I run the command below, I get the error below that command.

df = spark.read.format('jdbc') \
          .options(driver='org.sqlite.JDBC', dbtable='issn',
                   url='jdbc:sqlite:/dbfs/mnt/the_path/test.sqlite').load()
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<command-4113757> in <module>
      6 df = spark.read.format('jdbc') \
      7         .options(driver='org.sqlite.JDBC', dbtable='issn',
----> 8                  url='jdbc:sqlite:/dbfs/mnt/if_i_told_you_the_path_i_would_have_to_kill_you/lanabug.sqlite').load()

/databricks/spark/python/pyspark/sql/readwriter.py in load(self, path, format, schema, **options)
    182             return self._df(self._jreader.load(self._spark._sc._jvm.PythonUtils.toSeq(path)))
    183         else:
--> 184             return self._df(self._jreader.load())
    185 
    186     @since(1.4)

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    125     def deco(*a, **kw):
    126         try:
--> 127             return f(*a, **kw)
    128         except py4j.protocol.Py4JJavaError as e:
    129             converted = convert_exception(e.java_exception)

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o4773.load.
: java.sql.SQLException: Unsupported type NULL
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.getCatalystType(JdbcUtils.scala:256)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$getSchema$1(JdbcUtils.scala:321)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.getSchema(JdbcUtils.scala:321)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:63)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.getSchema(JDBCRelation.scala:226)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:35)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:370)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:384)
    at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:373)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:373)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:258)
    at sun.reflect.GeneratedMethodAccessor1612.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
    at py4j.Gateway.invoke(Gateway.java:295)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:251)
    at java.lang.Thread.run(Thread.java:748)

Is there a way to either get the correct schema inferred, or to specify the correct schema schema = StructType([StructField('modified_issn', IntegerType()), StructField('codes', StringType())])? I tried obvious hypothetical options like inferSchema='true', schema=schema, sqliteSchema=schema, jdbcSchema=schema and none of them worked. Note that I am not an administrator and can't reconfigure the system in any way.

MWE, steps to reproduce:

  1. Import the following code and run spawn_and_test_cache('test.sqlite') to create a database.

  2. Run the command I presented earlier, substituting the appropriate path.

import sqlite3


def convert_issn(issn):
    if issn[-1] != 'X':
        return int(issn)
    else:
        return 1000000000   int(issn.replace('X', '0'))


def read_cache(isxn, filename='.sdxml.sqlite', create=False, mode='issn'):
    c = sqlite3.connect(filename)
    if mode == 'issn':
        modified_isxn = convert_issn(isxn)
    else:
        modified_isxn = int(isxn)
    sql = 'PRAGMA journal_mode=WAL;'
    c.execute(sql)
    if create:
        sql = ('CREATE TABLE IF NOT EXISTS'
               ' %s(modified_%s INTEGER PRIMARY KEY, codes STRING)'
               % (mode, mode))
        c.execute(sql)
        while True:
            try:
                c.commit()
                break
            except sqlite3.OperationalError:
                time.sleep(0.001)
            except:
                break
    curs = c.cursor()
    curs.execute('SELECT codes FROM %s WHERE modified_%s = %d'
                 % (mode, mode, modified_isxn))
    row = curs.fetchone()
    if row is None:
        c.close()
        return None
    c.close()
    return row[0]


def write_cache(isxn, codes, filename='.sdxml.sqlite', create=False,
                mode='issn'):
    c = sqlite3.connect(filename)
    if mode == 'issn':
        modified_isxn = convert_issn(isxn)
    else:
        modified_isxn = int(isxn)
    sql = 'PRAGMA journal_mode=WAL;'
    c.execute(sql)
    if create:
        sql = ('CREATE TABLE IF NOT EXISTS'
               ' %s(modified_%s INTEGER PRIMARY KEY, codes STRING);'
               % (mode, mode))
        c.execute(sql)
        while True:
            try:
                c.commit()
                break
            except sqlite3.OperationalError:
                time.sleep(0.0001)
            except Exception:
                break
    sql = ('INSERT OR REPLACE INTO %s values (%d, "%s")'
           % (mode, modified_isxn, codes))
    c.execute(sql)
    while True:
        try:
            c.commit()
            break
        except sqlite3.OperationalError:
            time.sleep(0.0001)
        except Exception:
            c.close()
            return False
    c.close()
    return True


def spawn_and_test_cache(filename='.sdxml.sqlite'):
    write_cache('10000000000000', 'abc', filename=filename, create=True)
    write_cache('10000000000000', 'def', filename=filename, create=True,
                mode='isbn')
    return (read_cache('10000000000000', filename=filename),
            read_cache('10000000000000', filename=filename, mode='isbn'))```

CodePudding user response:

For the SQLite tables' definition, use TEXT (instead of the unsupported STRING) -
and you're good :-)

sqlite datatypes

(spark.read.format('jdbc')
 .options(driver='org.sqlite.JDBC', dbtable='sqlite_master',url='jdbc:sqlite:sdxml.sqlite').load()
 .show(truncate=False)
)

 ----- ---- -------- -------- ---------------------------------------------------------------- 
|type |name|tbl_name|rootpage|sql                                                             |
 ----- ---- -------- -------- ---------------------------------------------------------------- 
|table|issn|issn    |2       |CREATE TABLE issn(modified_issn INTEGER PRIMARY KEY, codes TEXT)|
|table|isbn|isbn    |3       |CREATE TABLE isbn(modified_isbn INTEGER PRIMARY KEY, codes TEXT)|
 ----- ---- -------- -------- ---------------------------------------------------------------- 

(spark.read.format('jdbc')
 .options(driver='org.sqlite.JDBC', dbtable='issn',url='jdbc:sqlite:sdxml.sqlite').load()
 .show()
)

 -------------- ----- 
| modified_issn|codes|
 -------------- ----- 
|10000000000000|  abc|
 -------------- ----- 

(spark.read.format('jdbc')
 .options(driver='org.sqlite.JDBC', dbtable='isbn',url='jdbc:sqlite:sdxml.sqlite').load()
 .show()
)


 -------------- ----- 
| modified_isbn|codes|
 -------------- ----- 
|10000000000000|  def|
 -------------- ----- 

P.S.
customSchema does control the schema, however it does not prevent the error for unknown type at the source.
I'm guessing that the JDBC driver has mapping table for the data types and once it failed to find a data type of the source, it throws an exception.

driver = 'org.sqlite.JDBC'
query = 'select modified_issn from issn'
url = 'jdbc:sqlite:sdxml.sqlite'

spark.read.format('jdbc').options(customSchema = 'modified_issn tinyint'  ,driver=driver ,query=query ,url=url).load().printSchema()
spark.read.format('jdbc').options(customSchema = 'modified_issn smallint' ,driver=driver ,query=query ,url=url).load().printSchema()
spark.read.format('jdbc').options(customSchema = 'modified_issn int'      ,driver=driver ,query=query ,url=url).load().printSchema()
spark.read.format('jdbc').options(customSchema = 'modified_issn bigint'   ,driver=driver ,query=query ,url=url).load().printSchema()
spark.read.format('jdbc').options(customSchema = 'modified_issn string'   ,driver=driver ,query=query ,url=url).load().printSchema()

root
 |-- modified_issn: byte (nullable = true)

root
 |-- modified_issn: short (nullable = true)

root
 |-- modified_issn: integer (nullable = true)

root
 |-- modified_issn: long (nullable = true)

root
 |-- modified_issn: string (nullable = true)
  • Related