I have a SQLite database I want to import into Spark on DataBricks.
When I run the command below, I get the error below that command.
df = spark.read.format('jdbc') \
.options(driver='org.sqlite.JDBC', dbtable='issn',
url='jdbc:sqlite:/dbfs/mnt/the_path/test.sqlite').load()
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<command-4113757> in <module>
6 df = spark.read.format('jdbc') \
7 .options(driver='org.sqlite.JDBC', dbtable='issn',
----> 8 url='jdbc:sqlite:/dbfs/mnt/if_i_told_you_the_path_i_would_have_to_kill_you/lanabug.sqlite').load()
/databricks/spark/python/pyspark/sql/readwriter.py in load(self, path, format, schema, **options)
182 return self._df(self._jreader.load(self._spark._sc._jvm.PythonUtils.toSeq(path)))
183 else:
--> 184 return self._df(self._jreader.load())
185
186 @since(1.4)
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
1303 answer = self.gateway_client.send_command(command)
1304 return_value = get_return_value(
-> 1305 answer, self.gateway_client, self.target_id, self.name)
1306
1307 for temp_arg in temp_args:
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
125 def deco(*a, **kw):
126 try:
--> 127 return f(*a, **kw)
128 except py4j.protocol.Py4JJavaError as e:
129 converted = convert_exception(e.java_exception)
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o4773.load.
: java.sql.SQLException: Unsupported type NULL
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.getCatalystType(JdbcUtils.scala:256)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$getSchema$1(JdbcUtils.scala:321)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.getSchema(JdbcUtils.scala:321)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:63)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.getSchema(JDBCRelation.scala:226)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:35)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:370)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:384)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:373)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:373)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:258)
at sun.reflect.GeneratedMethodAccessor1612.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:295)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:251)
at java.lang.Thread.run(Thread.java:748)
Is there a way to either get the correct schema inferred, or to specify the correct schema schema = StructType([StructField('modified_issn', IntegerType()), StructField('codes', StringType())])
? I tried obvious hypothetical options like inferSchema='true'
, schema=schema
, sqliteSchema=schema
, jdbcSchema=schema
and none of them worked. Note that I am not an administrator and can't reconfigure the system in any way.
MWE, steps to reproduce:
Import the following code and run
spawn_and_test_cache('test.sqlite')
to create a database.Run the command I presented earlier, substituting the appropriate path.
import sqlite3
def convert_issn(issn):
if issn[-1] != 'X':
return int(issn)
else:
return 1000000000 int(issn.replace('X', '0'))
def read_cache(isxn, filename='.sdxml.sqlite', create=False, mode='issn'):
c = sqlite3.connect(filename)
if mode == 'issn':
modified_isxn = convert_issn(isxn)
else:
modified_isxn = int(isxn)
sql = 'PRAGMA journal_mode=WAL;'
c.execute(sql)
if create:
sql = ('CREATE TABLE IF NOT EXISTS'
' %s(modified_%s INTEGER PRIMARY KEY, codes STRING)'
% (mode, mode))
c.execute(sql)
while True:
try:
c.commit()
break
except sqlite3.OperationalError:
time.sleep(0.001)
except:
break
curs = c.cursor()
curs.execute('SELECT codes FROM %s WHERE modified_%s = %d'
% (mode, mode, modified_isxn))
row = curs.fetchone()
if row is None:
c.close()
return None
c.close()
return row[0]
def write_cache(isxn, codes, filename='.sdxml.sqlite', create=False,
mode='issn'):
c = sqlite3.connect(filename)
if mode == 'issn':
modified_isxn = convert_issn(isxn)
else:
modified_isxn = int(isxn)
sql = 'PRAGMA journal_mode=WAL;'
c.execute(sql)
if create:
sql = ('CREATE TABLE IF NOT EXISTS'
' %s(modified_%s INTEGER PRIMARY KEY, codes STRING);'
% (mode, mode))
c.execute(sql)
while True:
try:
c.commit()
break
except sqlite3.OperationalError:
time.sleep(0.0001)
except Exception:
break
sql = ('INSERT OR REPLACE INTO %s values (%d, "%s")'
% (mode, modified_isxn, codes))
c.execute(sql)
while True:
try:
c.commit()
break
except sqlite3.OperationalError:
time.sleep(0.0001)
except Exception:
c.close()
return False
c.close()
return True
def spawn_and_test_cache(filename='.sdxml.sqlite'):
write_cache('10000000000000', 'abc', filename=filename, create=True)
write_cache('10000000000000', 'def', filename=filename, create=True,
mode='isbn')
return (read_cache('10000000000000', filename=filename),
read_cache('10000000000000', filename=filename, mode='isbn'))```
CodePudding user response:
For the SQLite tables' definition, use TEXT (instead of the unsupported STRING) -
and you're good :-)
(spark.read.format('jdbc')
.options(driver='org.sqlite.JDBC', dbtable='sqlite_master',url='jdbc:sqlite:sdxml.sqlite').load()
.show(truncate=False)
)
----- ---- -------- -------- ----------------------------------------------------------------
|type |name|tbl_name|rootpage|sql |
----- ---- -------- -------- ----------------------------------------------------------------
|table|issn|issn |2 |CREATE TABLE issn(modified_issn INTEGER PRIMARY KEY, codes TEXT)|
|table|isbn|isbn |3 |CREATE TABLE isbn(modified_isbn INTEGER PRIMARY KEY, codes TEXT)|
----- ---- -------- -------- ----------------------------------------------------------------
(spark.read.format('jdbc')
.options(driver='org.sqlite.JDBC', dbtable='issn',url='jdbc:sqlite:sdxml.sqlite').load()
.show()
)
-------------- -----
| modified_issn|codes|
-------------- -----
|10000000000000| abc|
-------------- -----
(spark.read.format('jdbc')
.options(driver='org.sqlite.JDBC', dbtable='isbn',url='jdbc:sqlite:sdxml.sqlite').load()
.show()
)
-------------- -----
| modified_isbn|codes|
-------------- -----
|10000000000000| def|
-------------- -----
P.S.
customSchema
does control the schema, however it does not prevent the error for unknown type at the source.
I'm guessing that the JDBC driver has mapping table for the data types and once it failed to find a data type of the source, it throws an exception.
driver = 'org.sqlite.JDBC'
query = 'select modified_issn from issn'
url = 'jdbc:sqlite:sdxml.sqlite'
spark.read.format('jdbc').options(customSchema = 'modified_issn tinyint' ,driver=driver ,query=query ,url=url).load().printSchema()
spark.read.format('jdbc').options(customSchema = 'modified_issn smallint' ,driver=driver ,query=query ,url=url).load().printSchema()
spark.read.format('jdbc').options(customSchema = 'modified_issn int' ,driver=driver ,query=query ,url=url).load().printSchema()
spark.read.format('jdbc').options(customSchema = 'modified_issn bigint' ,driver=driver ,query=query ,url=url).load().printSchema()
spark.read.format('jdbc').options(customSchema = 'modified_issn string' ,driver=driver ,query=query ,url=url).load().printSchema()
root
|-- modified_issn: byte (nullable = true)
root
|-- modified_issn: short (nullable = true)
root
|-- modified_issn: integer (nullable = true)
root
|-- modified_issn: long (nullable = true)
root
|-- modified_issn: string (nullable = true)