Using SQLAlchemy async ORM 1.4, Postgres backend, Python 3.7
I am using an augmented Declarative Base
with the SA ORM. The tables are not held in models.py
but are committed directly to the database by parsing a JSON script that contains all the table schemas. Because of this, I can't import the models at the top of the script like from models import ThisTable
.
So to work with CRUD operations on the tables, I first retrieve them by reflecting the metadata.
In the 'usual' way, when importing all the tables at the top of the script, a query like this works:
result = await s.execute(select(func.sum(TableName.column)))
curr = result.all()
When I try to reflect the table and column objects from the MetaData in order to query them, this doesn't work. There are lots of AttributeError: 'Table' object has no attribute 'func'
or TypeError: 'Table' object is not callable
errors.
def retrieve_table_obj(table):
meta = MetaData()
meta.reflect(bind=sync_engine)
return meta.tables[table]
def retrieve_table_cols(self, table):
table = retrieve_table_obj('users')
return table.columns.keys()
async def reading(collection, modifications):
table = db.retrieve_table_obj(collection)
columns = db.retrieve_table_cols(collection)
for c in columns:
for f in mods['fields']:
if c in f:
q = select(func.sum(table.c))
result = await s.execute(q)
curr = result.all()
asyncio.run(reading("users", {'fields': ["usage", "allowance"]}))
How can I query tables and columns in the database when they first have to be explicitly retrieved?
CodePudding user response:
The automap extension can be used to automatically reflect database tables to SQLAlchemy models. However automap uses inspect
on the engine, and this isn't supported on async engines; we can work around this by doing the automapping using a synchronous engine. Once the models have been mapped they can be used by the async engine.
For example:
import asyncio
import sqlalchemy as sa
from sqlalchemy import orm
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.ext.automap import automap_base
sync_engine = sa.create_engine('postgresql:///test', echo=True, future=True)
Base = automap_base()
Base.prepare(sync_engine, reflect=True)
async def async_main(collection, modifications):
engine = create_async_engine(
"postgresql asyncpg:///test",
echo=True,
future=True,
connect_args={'ssl': False},
)
async_session = orm.sessionmaker(
engine, class_=AsyncSession, future=True
)
async with async_session() as session:
model = Base.classes[collection]
matches = set(model.__mapper__.columns.keys()) & set(modifications['fields'])
for m in matches:
q = sa.select(sa.func.sum(getattr(model, m)))
result = await session.execute(q)
curr = result.all()
for row in curr:
print(row)
print()
# for AsyncEngine created in function scope, close and
# clean-up pooled connections
await engine.dispose()
asyncio.run(reading("users", {'fields': ["usage", "allowance"]}))
If you don't need models, caching the MetaData
object rather than recreating it on every call to retrieve_table_obj
would make the existing code more efficient, and replacing select(func.sum(table.c))
with select(sa.func.sum(getattr(table.c, c)))