Home > Software engineering >  SQLAlchemy bulk insert statement in Postgres database throws AttributeError
SQLAlchemy bulk insert statement in Postgres database throws AttributeError

Time:07-09

I am trying to insert rows in Python SQLAlchemy by bulk into a Postgres database by using an insert statement. I need to use the insert statement instead of bulk_insert_mappings, as I want to silently ignore failed insertion of duplicate entries. This was not apparent before, but I have added it now.

The table is created as it should. However, even a very simple insert operation via statement API throws this error:

AttributeError: '_NoResultMetaData' object has no attribute '_indexes_for_keys'

Minimal Verifiable Example:

import os

import sqlalchemy
from sqlalchemy import (
    Column,
    INTEGER,
    TEXT
)
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

Base = declarative_base()


class Test(Base):
    __tablename__ = 'test'
    id = Column(INTEGER, primary_key=True)
    data = Column(TEXT)


engine = sqlalchemy.create_engine(os.environ['DATABASE_CONNECTION'])
Session = sessionmaker(engine)

Base.metadata.create_all(engine, Base.metadata.tables.values(), checkfirst=True)

connection = engine.connect()
buffer = [
    {
        'data': "First test"
    },
    {
        'data': "Second test"
    }
]

insert_statement = insert(Test).values(buffer)
# Using insert statement instead of bulk_insert_mappings so I can do nothing when adding duplicate entries
insert_or_do_nothing = insert_statement.on_conflict_do_nothing(index_elements=[Company.local_id])
orm_statement = sqlalchemy.select(Test).from_statement(insert_or_do_nothing)

with Session() as session:
    session.execute(orm_statement).scalars()

connection.close()

Full stacktrace:

Traceback (most recent call last):
  File "/project/path/test.py", line 41, in <module>
    session.execute(orm_statement).scalars()
  File "/venv/path/sqlalchemy/orm/session.py", line 1715, in execute
    result = compile_state_cls.orm_setup_cursor_result(
  File "/venv/path/sqlalchemy/orm/context.py", line 354, in orm_setup_cursor_result
    return loading.instances(result, querycontext)
  File "/venv/path/sqlalchemy/orm/loading.py", line 89, in instances
    cursor.close()
  File "/venv/path/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    compat.raise_(
  File "/venv/path/sqlalchemy/util/compat.py", line 208, in raise_
    raise exception
  File "/venv/path/sqlalchemy/orm/loading.py", line 69, in instances
    *[
  File "/venv/path/sqlalchemy/orm/loading.py", line 70, in <listcomp>
    query_entity.row_processor(context, cursor)
  File "/venv/path/sqlalchemy/orm/context.py", line 2627, in row_processor
    _instance = loading._instance_processor(
  File "/venv/path/sqlalchemy/orm/loading.py", line 715, in _instance_processor
    primary_key_getter = result._tuple_getter(pk_cols)
  File "/venv/path/sqlalchemy/engine/result.py", line 934, in _tuple_getter
    return self._metadata._row_as_tuple_getter(keys)
  File "/venv/path/sqlalchemy/engine/result.py", line 106, in _row_as_tuple_getter
    indexes = self._indexes_for_keys(keys)
AttributeError: '_NoResultMetaData' object has no attribute '_indexes_for_keys'

Am I misusing the statement interface? The ORM statement looks fine:

INSERT INTO test (data) VALUES (:data_m0), (:data_m1)

I am using

  • PostgreSQL 14.4
  • psycopg2-binary 2.9.3
  • SQLAlchemy 1.4.39

CodePudding user response:

Looking at the docs you could try to use session.bulk_insert_mappings().

buffer = [
    {
        'data': "First test"
    },
    {
        'data': "Second test"
    }
]


with Session() as session:
    session.bulk_insert_mappings(Test, buffer)

CodePudding user response:

I found a solution that uses insert statement: Avoid using the ORM statements. For some reason, using plain statements seems to do the job, whilst ORM ones throw the AttributeError.

This is confusing, as the official documentation calls for ORM statements:

# THIS APPROACH DID NOT WORK FOR ME

stmt = stmt.on_conflict_do_update(
    index_elements=[User.name], set_=dict(fullname=stmt.excluded.fullname)
).returning(User)

orm_stmt = (
    select(User)
    .from_statement(stmt)
    .execution_options(populate_existing=True)
)
for user in session.execute(
    orm_stmt,
).scalars():
    print("inserted or updated: %s" % user)

But if you omit the ORM statement part, all is good

# THIS WORKS

insert_statement = insert(Test).values(buffer)
insert_or_do_nothing = insert_statement.on_conflict_do_nothing(index_elements=[Test.id])

    with Session() as session:
        session.execute(insert_or_do_nothing)
        session.commit()
  • Related