I have a list of records, many of which are duplicates, that I'm trying to "UPSERT" into a SQLite3 database using SQLAlchemy Core 1.4.35 and Python 3.9.2.
If the module tries to INSERT a record that doesn't already exist in the table, I want the INSERT to succeed, and I want to return the primary key that was created for that record.
But if the module tries to INSERT a record that already exists, then I want the INSERT to fail. And I want to SELECT and return the ID of the existing record.
I run into two problems when I run the code below and I can't figure out what's causing either problem.
If I remove all the code indented under the
except IntegrityError as err
line, then the module finishes successfully. But the database table has a bunch of duplicate records -- despite the UNIQUE constraint on the table.If I leave that code in place, and try to SELECT the ID of the existing record when handling the exception -- I get the errors below.
What is causing these two problems?
What can I do to prevent duplicate records from being INSERTed, and also SELECT the ID of the existing record?
I'm still learning SQL so I may be missing something obvious here.
Possibly relevant update
All the duplicate records I checked have one thing in common, which is that one of the fields contains a NULL. Does SQLite not recognize NULLs when it checks for UNIQUE table constraints?
Input data and table contents
The spreadsheet linked below contains the sample data I used in this test, and the contents of the DB table after running the code below. The records highlighted in yellow are duplicated records in the table.
The test database
BEGIN TRANSACTION;
CREATE TABLE IF NOT EXISTS "Containers" (
"ContainerID" INTEGER NOT NULL UNIQUE,
"ContainerName" TEXT,
"Qty" INTEGER,
"Size" INTEGER,
"Unit" TEXT,
"ClientOwned" TEXT,
"VendorOwned" TEXT,
PRIMARY KEY("ContainerID"),
UNIQUE("ContainerName","Qty","Size","Unit","ClientOwned","VendorOwned")
);
COMMIT;
The test Python module
# Built-In
import datetime
import sys
from timeit import default_timer as timer
# Third-Party
import pandas as pd
import pprint
import sqlalchemy
from loguru import logger
from sqlalchemy import create_engine, MetaData, Table, select
from sqlalchemy.exc import IntegrityError
db_file = 'upsert_bug_test.db'
data_file = 'container_sample_data.xlsx'
pp = pprint.PrettyPrinter(indent=3)
class TestDatabase:
def __init__(self):
self.engine = create_engine(f'sqlite:///{db_file}')
self.conn = self.engine.connect()
self.metadata_obj = MetaData()
self._reflect_db()
def _reflect_db(self):
"""Get the schema for the tables and views that already exist in the database."""
self.db_containers = Table('Containers', self.metadata_obj, autoload_with=self.engine)
# Dictionary to map the fields in the data file to fields in the database
# Mapping key = database table
# Mapping value = list of field dictionaries
# Field key = name of field in report
# Field value = name of field in database table
mapping = {
'Containers': [
{'Container': 'ContainerName'},
{'Qty': 'Qty'},
{'Size': 'Size'},
{'Unit': 'Unit'},
{'Owned by Client': 'ClientOwned'},
{'Owned by Vendor': 'VendorOwned'},
],
}
def build_insert_dict(record: dict, section_mapping: list):
'''Using the mapping dictionary, build and return a dictionary of parameters that will be
passed to insert() and select().'''
return_data = {}
for pair in section_mapping:
for report_field in pair:
report_value = record.get(report_field)
db_field = pair.get(report_field)
return_data.update({db_field: report_value})
return return_data
if __name__ == '__main__':
# Start the timer
start = timer()
# Create an instance of the database object
db = TestDatabase()
# Load the data from the sample file
logger.info('Loading records from sample file...')
data_df = pd.read_excel(pd.ExcelFile(data_file), 'Sheet1')
new_data_df = data_df.astype(object).where(data_df.notna(), None)
data = new_data_df.to_dict('records')
# Insert each record from the data file into the database. If a record already exists,
# get the ID for that record and return it.
for record in data:
container_data = build_insert_dict(record, mapping.get('Containers'))
try:
ins = db.db_containers.insert().values(**container_data)
res = db.conn.execute(ins)
container_id = res.inserted_primary_key[0]
except IntegrityError as err:
logger.info(f'Record already exists in the database. Attempting to find the ID for that record...')
sel = select(db.db_containers.c.ContainerID).filter_by(**container_data)
res = db.conn.execute(sel)
old_container_id = res.fetchone()
if not old_container_id:
logger.error('Failed to get previously inserted container id! Investigate')
exit(-1)
else:
container_id = old_container_id[0]
logger.info(f'Found ID {container_id}')
stop = timer()
done = stop - start
migration_time = datetime.timedelta(seconds=done)
logger.success(f'Inserted all unique records into the database in {migration_time}')
The SELECT error
(oaktier-env) PS C:\Oaktier\oaktier\tasks> python .\upsert_test.py
2022-04-21 11:27:26.670 | INFO | __main__:<module>:76 - Loading records from sample file...
2022-04-21 11:27:27.205 | INFO | __main__:<module>:91 - Record already exists in the database. Attempting to find the ID for that record...
2022-04-21 11:27:27.210 | INFO | __main__:<module>:100 - Found ID 1
2022-04-21 11:27:27.211 | INFO | __main__:<module>:91 - Record already exists in the database. Attempting to find the ID for that record...
2022-04-21 11:27:27.215 | INFO | __main__:<module>:100 - Found ID 2
2022-04-21 11:27:27.216 | INFO | __main__:<module>:91 - Record already exists in the database. Attempting to find the ID for that record...
Traceback (most recent call last):
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1819, in _execute_context
self.dialect.do_execute(
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\default.py", line 732, in do_execute
cursor.execute(statement, parameters)
sqlite3.IntegrityError: UNIQUE constraint failed: Containers.ContainerName, Containers.Qty, Containers.Size, Containers.Unit, Containers.ClientOwned, Containers.VendorOwned
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "C:\Oaktier\oaktier\tasks\upsert_test.py", line 88, in <module>
res = db.conn.execute(ins)
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1306, in execute
return meth(self, multiparams, params, _EMPTY_EXECUTION_OPTS)
File "C:\oaktier-env\lib\site-packages\sqlalchemy\sql\elements.py", line 325, in _execute_on_connection
return connection._execute_clauseelement(
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1498, in _execute_clauseelement
ret = self._execute_context(
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1862, in _execute_context
self._handle_dbapi_exception(
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 2043, in _handle_dbapi_exception
util.raise_(
File "C:\oaktier-env\lib\site-packages\sqlalchemy\util\compat.py", line 207, in raise_
raise exception
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1819, in _execute_context
self.dialect.do_execute(
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\default.py", line 732, in do_execute
cursor.execute(statement, parameters)
sqlalchemy.exc.IntegrityError: (sqlite3.IntegrityError) UNIQUE constraint failed: Containers.ContainerName, Containers.Qty, Containers.Size, Containers.Unit, Containers.ClientOwned, Containers.VendorOwned
[SQL: INSERT INTO "Containers" ("ContainerName", "Qty", "Size", "Unit", "ClientOwned", "VendorOwned") VALUES (?, ?, ?, ?, ?, ?)]
[parameters: ('Dumpster (FEL/REL)', 1, 2.0, 'cu. Yd.', 'None', 'Dumpster (FEL/REL)')]
(Background on this error at: https://sqlalche.me/e/14/gkpj)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1819, in _execute_context
self.dialect.do_execute(
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\default.py", line 732, in do_execute
cursor.execute(statement, parameters)
sqlite3.InterfaceError: Error binding parameter 0 - probably unsupported type.
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "C:\Oaktier\oaktier\tasks\upsert_test.py", line 93, in <module>
res = db.conn.execute(sel)
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1306, in execute
return meth(self, multiparams, params, _EMPTY_EXECUTION_OPTS)
File "C:\oaktier-env\lib\site-packages\sqlalchemy\sql\elements.py", line 325, in _execute_on_connection
return connection._execute_clauseelement(
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1498, in _execute_clauseelement
ret = self._execute_context(
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1862, in _execute_context
self._handle_dbapi_exception(
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 2043, in _handle_dbapi_exception
util.raise_(
File "C:\oaktier-env\lib\site-packages\sqlalchemy\util\compat.py", line 207, in raise_
raise exception
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1819, in _execute_context
self.dialect.do_execute(
File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\default.py", line 732, in do_execute
cursor.execute(statement, parameters)
sqlalchemy.exc.InterfaceError: (sqlite3.InterfaceError) Error binding parameter 0 - probably unsupported type.
[SQL: SELECT "Containers"."ContainerID"
FROM "Containers"
WHERE "Containers"."ContainerName" = ? AND "Containers"."Qty" = ? AND "Containers"."Size" = ? AND "Containers"."Unit" = ? AND "Containers"."ClientOwned" = ? AND "Containers"."VendorOwned" = ?]
[parameters: ('Dumpster (FEL/REL)', 1, 2.0, 'cu. Yd.', 'None', 'Dumpster (FEL/REL)')]
(Background on this error at: https://sqlalche.me/e/14/rvf5)
CodePudding user response:
According to SQLite docs on the UNIQUE
constraint:
For the purposes of UNIQUE constraints, NULL values are considered distinct from all other values, including other NULLs.
Therefore, it is not the case that NULLs are ignored but they are considered as unique entities. Indeed, even None
s in Python and NaN
s in Pandas/Numpy are not equal to each other. So, to SQLite, your highlighted rows with NULL
s are not duplicate rows but actually unique rows. For empty string, there may whitespace issues.
As a workaround, consider filling in a non-None placeholder or empty string to be replaced by NULL
after insert query. Avoid converting entire data frame to same type but explicitly handle each column as needed.
with pd.ExcelFile(data_file) as wb:
raw_df = pd.read_excel(wb, 'Sheet1')
# CLEAN EACH STRING COLUMN
clean_df = (
raw_df.assign(
ContainerName = lambda x: x["ContainerName"].astype("string").str.strip(),
Unit = lambda x: x["Unit"].astype("string").str.strip(),
ClientOwned = lambda x: x["ClientOwned"].astype("string").str.strip(),
VendorOwned = lambda x: x["VendorOwned"].astype("string").str.strip()
).where(data_df.notna(), "")
)
db_data = clean_df.to_dict('records')
Translate below to SQLAlchemy syntax:
UPDATE Containers SET ContainerName = NULL WHERE ContainerName = '';
UPDATE Containers SET Unit = NULL WHERE Unit = '';
UPDATE Containers SET ClientOwned = NULL WHERE ClientOwned = '';
UPDATE Containers SET VendorOwned = NULL WHERE VendorOwned = '';