Home > Back-end >  How do I correctly perform an UPSERT on a SQLite3 DB using SQLAlchemy Core?
How do I correctly perform an UPSERT on a SQLite3 DB using SQLAlchemy Core?

Time:04-22

I have a list of records, many of which are duplicates, that I'm trying to "UPSERT" into a SQLite3 database using SQLAlchemy Core 1.4.35 and Python 3.9.2.

If the module tries to INSERT a record that doesn't already exist in the table, I want the INSERT to succeed, and I want to return the primary key that was created for that record.

But if the module tries to INSERT a record that already exists, then I want the INSERT to fail. And I want to SELECT and return the ID of the existing record.

I run into two problems when I run the code below and I can't figure out what's causing either problem.

  1. If I remove all the code indented under the except IntegrityError as err line, then the module finishes successfully. But the database table has a bunch of duplicate records -- despite the UNIQUE constraint on the table.

  2. If I leave that code in place, and try to SELECT the ID of the existing record when handling the exception -- I get the errors below.

What is causing these two problems?

What can I do to prevent duplicate records from being INSERTed, and also SELECT the ID of the existing record?

I'm still learning SQL so I may be missing something obvious here.


Possibly relevant update

All the duplicate records I checked have one thing in common, which is that one of the fields contains a NULL. Does SQLite not recognize NULLs when it checks for UNIQUE table constraints?

Input data and table contents

The spreadsheet linked below contains the sample data I used in this test, and the contents of the DB table after running the code below. The records highlighted in yellow are duplicated records in the table.

https://docs.google.com/spreadsheets/d/1dS75vmzzNAqGShqakRwN8FUkL7RugiTo/edit?usp=sharing&ouid=102857691407472073826&rtpof=true&sd=true


The test database

BEGIN TRANSACTION;
CREATE TABLE IF NOT EXISTS "Containers" (
    "ContainerID"   INTEGER NOT NULL UNIQUE,
    "ContainerName" TEXT,
    "Qty"   INTEGER,
    "Size"  INTEGER,
    "Unit"  TEXT,
    "ClientOwned"   TEXT,
    "VendorOwned"   TEXT,
    PRIMARY KEY("ContainerID"),
    UNIQUE("ContainerName","Qty","Size","Unit","ClientOwned","VendorOwned")
);
COMMIT;

The test Python module

# Built-In
import datetime
import sys
from timeit import default_timer as timer

# Third-Party
import pandas as pd
import pprint
import sqlalchemy
from loguru import logger
from sqlalchemy import create_engine, MetaData, Table, select
from sqlalchemy.exc import IntegrityError

db_file = 'upsert_bug_test.db'
data_file = 'container_sample_data.xlsx'

pp = pprint.PrettyPrinter(indent=3)

class TestDatabase:
    def __init__(self):
        self.engine = create_engine(f'sqlite:///{db_file}')
        self.conn = self.engine.connect()
        self.metadata_obj = MetaData()

        self._reflect_db()

    def _reflect_db(self):
        """Get the schema for the tables and views that already exist in the database."""

        self.db_containers = Table('Containers', self.metadata_obj, autoload_with=self.engine)


# Dictionary to map the fields in the data file to fields in the database
# Mapping key = database table
# Mapping value = list of field dictionaries
#       Field key = name of field in report
#       Field value = name of field in database table
mapping = {
    'Containers': [
        {'Container': 'ContainerName'},
        {'Qty': 'Qty'}, 
        {'Size': 'Size'},
        {'Unit': 'Unit'}, 
        {'Owned by Client': 'ClientOwned'}, 
        {'Owned by Vendor': 'VendorOwned'},
    ],
}


def build_insert_dict(record: dict, section_mapping: list):
    '''Using the mapping dictionary, build and return a dictionary of parameters that will be
    passed to insert() and select().'''

    return_data = {}

    for pair in section_mapping:
        for report_field in pair:
            report_value = record.get(report_field)
            db_field = pair.get(report_field)
            return_data.update({db_field: report_value})

    return return_data


if __name__ == '__main__':
    # Start the timer
    start = timer()

    # Create an instance of the database object
    db = TestDatabase()

    # Load the data from the sample file
    logger.info('Loading records from sample file...')

    data_df = pd.read_excel(pd.ExcelFile(data_file), 'Sheet1')
    new_data_df = data_df.astype(object).where(data_df.notna(), None)
    data = new_data_df.to_dict('records')

    # Insert each record from the data file into the database. If a record already exists,
    # get the ID for that record and return it.
    for record in data:
        container_data = build_insert_dict(record, mapping.get('Containers'))
        try:
            ins = db.db_containers.insert().values(**container_data)
            res = db.conn.execute(ins)
            container_id = res.inserted_primary_key[0]
        except IntegrityError as err:
            logger.info(f'Record already exists in the database. Attempting to find the ID for that record...')
            sel = select(db.db_containers.c.ContainerID).filter_by(**container_data)
            res = db.conn.execute(sel)
            old_container_id = res.fetchone()
            if not old_container_id:
                logger.error('Failed to get previously inserted container id! Investigate')
                exit(-1)
            else:
                container_id = old_container_id[0]
                logger.info(f'Found ID {container_id}')

    stop = timer()
    done = stop - start
    migration_time = datetime.timedelta(seconds=done)

    logger.success(f'Inserted all unique records into the database in {migration_time}')

The SELECT error

(oaktier-env) PS C:\Oaktier\oaktier\tasks> python .\upsert_test.py
2022-04-21 11:27:26.670 | INFO     | __main__:<module>:76 - Loading records from sample file...
2022-04-21 11:27:27.205 | INFO     | __main__:<module>:91 - Record already exists in the database. Attempting to find the ID for that record...
2022-04-21 11:27:27.210 | INFO     | __main__:<module>:100 - Found ID 1
2022-04-21 11:27:27.211 | INFO     | __main__:<module>:91 - Record already exists in the database. Attempting to find the ID for that record...
2022-04-21 11:27:27.215 | INFO     | __main__:<module>:100 - Found ID 2
2022-04-21 11:27:27.216 | INFO     | __main__:<module>:91 - Record already exists in the database. Attempting to find the ID for that record...
Traceback (most recent call last):
  File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1819, in _execute_context
    self.dialect.do_execute(
  File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\default.py", line 732, in do_execute
    cursor.execute(statement, parameters)
sqlite3.IntegrityError: UNIQUE constraint failed: Containers.ContainerName, Containers.Qty, Containers.Size, Containers.Unit, Containers.ClientOwned, Containers.VendorOwned

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "C:\Oaktier\oaktier\tasks\upsert_test.py", line 88, in <module>
    res = db.conn.execute(ins)
  File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1306, in execute
    return meth(self, multiparams, params, _EMPTY_EXECUTION_OPTS)
  File "C:\oaktier-env\lib\site-packages\sqlalchemy\sql\elements.py", line 325, in _execute_on_connection
    return connection._execute_clauseelement(
  File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1498, in _execute_clauseelement
    ret = self._execute_context(
  File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1862, in _execute_context
    self._handle_dbapi_exception(
  File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 2043, in _handle_dbapi_exception
    util.raise_(
  File "C:\oaktier-env\lib\site-packages\sqlalchemy\util\compat.py", line 207, in raise_
    raise exception
  File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1819, in _execute_context
    self.dialect.do_execute(
  File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\default.py", line 732, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.IntegrityError: (sqlite3.IntegrityError) UNIQUE constraint failed: Containers.ContainerName, Containers.Qty, Containers.Size, Containers.Unit, Containers.ClientOwned, Containers.VendorOwned
[SQL: INSERT INTO "Containers" ("ContainerName", "Qty", "Size", "Unit", "ClientOwned", "VendorOwned") VALUES (?, ?, ?, ?, ?, ?)]
[parameters: ('Dumpster (FEL/REL)', 1, 2.0, 'cu. Yd.', 'None', 'Dumpster (FEL/REL)')]
(Background on this error at: https://sqlalche.me/e/14/gkpj)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1819, in _execute_context
    self.dialect.do_execute(
  File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\default.py", line 732, in do_execute
    cursor.execute(statement, parameters)
sqlite3.InterfaceError: Error binding parameter 0 - probably unsupported type.

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "C:\Oaktier\oaktier\tasks\upsert_test.py", line 93, in <module>
    res = db.conn.execute(sel)
  File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1306, in execute
    return meth(self, multiparams, params, _EMPTY_EXECUTION_OPTS)
  File "C:\oaktier-env\lib\site-packages\sqlalchemy\sql\elements.py", line 325, in _execute_on_connection
    return connection._execute_clauseelement(
  File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1498, in _execute_clauseelement
    ret = self._execute_context(
  File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1862, in _execute_context
    self._handle_dbapi_exception(
  File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 2043, in _handle_dbapi_exception
    util.raise_(
  File "C:\oaktier-env\lib\site-packages\sqlalchemy\util\compat.py", line 207, in raise_
    raise exception
  File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\base.py", line 1819, in _execute_context
    self.dialect.do_execute(
  File "C:\oaktier-env\lib\site-packages\sqlalchemy\engine\default.py", line 732, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.InterfaceError: (sqlite3.InterfaceError) Error binding parameter 0 - probably unsupported type.
[SQL: SELECT "Containers"."ContainerID"
FROM "Containers"
WHERE "Containers"."ContainerName" = ? AND "Containers"."Qty" = ? AND "Containers"."Size" = ? AND "Containers"."Unit" = ? AND "Containers"."ClientOwned" = ? AND "Containers"."VendorOwned" = ?]
[parameters: ('Dumpster (FEL/REL)', 1, 2.0, 'cu. Yd.', 'None', 'Dumpster (FEL/REL)')]
(Background on this error at: https://sqlalche.me/e/14/rvf5)

CodePudding user response:

According to SQLite docs on the UNIQUE constraint:

For the purposes of UNIQUE constraints, NULL values are considered distinct from all other values, including other NULLs.

Therefore, it is not the case that NULLs are ignored but they are considered as unique entities. Indeed, even Nones in Python and NaNs in Pandas/Numpy are not equal to each other. So, to SQLite, your highlighted rows with NULLs are not duplicate rows but actually unique rows. For empty string, there may whitespace issues.

As a workaround, consider filling in a non-None placeholder or empty string to be replaced by NULL after insert query. Avoid converting entire data frame to same type but explicitly handle each column as needed.

with pd.ExcelFile(data_file) as wb:
    raw_df = pd.read_excel(wb, 'Sheet1') 

# CLEAN EACH STRING COLUMN
clean_df = (
    raw_df.assign(
        ContainerName = lambda x: x["ContainerName"].astype("string").str.strip(),
        Unit = lambda x: x["Unit"].astype("string").str.strip(),
        ClientOwned = lambda x: x["ClientOwned"].astype("string").str.strip(),
        VendorOwned = lambda x: x["VendorOwned"].astype("string").str.strip()
    ).where(data_df.notna(), "")
)

db_data = clean_df.to_dict('records')

Translate below to SQLAlchemy syntax:

UPDATE Containers SET ContainerName = NULL WHERE ContainerName = '';
UPDATE Containers SET Unit = NULL WHERE Unit = '';
UPDATE Containers SET ClientOwned = NULL WHERE ClientOwned = '';
UPDATE Containers SET VendorOwned = NULL WHERE VendorOwned = '';
  • Related