I am trying to insert rows in Python SQLAlchemy by bulk into a Postgres database by using an insert
statement. I need to use the insert statement instead of bulk_insert_mappings
, as I want to silently ignore failed insertion of duplicate entries. This was not apparent before, but I have added it now.
The table is created as it should. However, even a very simple insert operation via statement API throws this error:
AttributeError: '_NoResultMetaData' object has no attribute '_indexes_for_keys'
Minimal Verifiable Example:
import os
import sqlalchemy
from sqlalchemy import (
Column,
INTEGER,
TEXT
)
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
Base = declarative_base()
class Test(Base):
__tablename__ = 'test'
id = Column(INTEGER, primary_key=True)
data = Column(TEXT)
engine = sqlalchemy.create_engine(os.environ['DATABASE_CONNECTION'])
Session = sessionmaker(engine)
Base.metadata.create_all(engine, Base.metadata.tables.values(), checkfirst=True)
connection = engine.connect()
buffer = [
{
'data': "First test"
},
{
'data': "Second test"
}
]
insert_statement = insert(Test).values(buffer)
# Using insert statement instead of bulk_insert_mappings so I can do nothing when adding duplicate entries
insert_or_do_nothing = insert_statement.on_conflict_do_nothing(index_elements=[Company.local_id])
orm_statement = sqlalchemy.select(Test).from_statement(insert_or_do_nothing)
with Session() as session:
session.execute(orm_statement).scalars()
connection.close()
Full stacktrace:
Traceback (most recent call last):
File "/project/path/test.py", line 41, in <module>
session.execute(orm_statement).scalars()
File "/venv/path/sqlalchemy/orm/session.py", line 1715, in execute
result = compile_state_cls.orm_setup_cursor_result(
File "/venv/path/sqlalchemy/orm/context.py", line 354, in orm_setup_cursor_result
return loading.instances(result, querycontext)
File "/venv/path/sqlalchemy/orm/loading.py", line 89, in instances
cursor.close()
File "/venv/path/sqlalchemy/util/langhelpers.py", line 70, in __exit__
compat.raise_(
File "/venv/path/sqlalchemy/util/compat.py", line 208, in raise_
raise exception
File "/venv/path/sqlalchemy/orm/loading.py", line 69, in instances
*[
File "/venv/path/sqlalchemy/orm/loading.py", line 70, in <listcomp>
query_entity.row_processor(context, cursor)
File "/venv/path/sqlalchemy/orm/context.py", line 2627, in row_processor
_instance = loading._instance_processor(
File "/venv/path/sqlalchemy/orm/loading.py", line 715, in _instance_processor
primary_key_getter = result._tuple_getter(pk_cols)
File "/venv/path/sqlalchemy/engine/result.py", line 934, in _tuple_getter
return self._metadata._row_as_tuple_getter(keys)
File "/venv/path/sqlalchemy/engine/result.py", line 106, in _row_as_tuple_getter
indexes = self._indexes_for_keys(keys)
AttributeError: '_NoResultMetaData' object has no attribute '_indexes_for_keys'
Am I misusing the statement interface? The ORM statement looks fine:
INSERT INTO test (data) VALUES (:data_m0), (:data_m1)
I am using
- PostgreSQL 14.4
- psycopg2-binary 2.9.3
- SQLAlchemy 1.4.39
CodePudding user response:
Looking at the docs you could try to use session.bulk_insert_mappings()
.
buffer = [
{
'data': "First test"
},
{
'data': "Second test"
}
]
with Session() as session:
session.bulk_insert_mappings(Test, buffer)
CodePudding user response:
I found a solution that uses insert statement: Avoid using the ORM statements. For some reason, using plain statements seems to do the job, whilst ORM ones throw the AttributeError
.
This is confusing, as the official documentation calls for ORM statements:
# THIS APPROACH DID NOT WORK FOR ME
stmt = stmt.on_conflict_do_update(
index_elements=[User.name], set_=dict(fullname=stmt.excluded.fullname)
).returning(User)
orm_stmt = (
select(User)
.from_statement(stmt)
.execution_options(populate_existing=True)
)
for user in session.execute(
orm_stmt,
).scalars():
print("inserted or updated: %s" % user)
But if you omit the ORM statement part, all is good
# THIS WORKS
insert_statement = insert(Test).values(buffer)
insert_or_do_nothing = insert_statement.on_conflict_do_nothing(index_elements=[Test.id])
with Session() as session:
session.execute(insert_or_do_nothing)
session.commit()