Home > Enterprise >  SQLAlchemy Asyncio ORM Unable to Query Database When Retrieving Tables and Columns from MetaData
SQLAlchemy Asyncio ORM Unable to Query Database When Retrieving Tables and Columns from MetaData

Time:12-02

Using SQLAlchemy async ORM 1.4, Postgres backend, Python 3.7

I am using an augmented Declarative Base with the SA ORM. The tables are not held in models.py but are committed directly to the database by parsing a JSON script that contains all the table schemas. Because of this, I can't import the models at the top of the script like from models import ThisTable.

So to work with CRUD operations on the tables, I first retrieve them by reflecting the metadata.

In the 'usual' way, when importing all the tables at the top of the script, a query like this works:

result = await s.execute(select(func.sum(TableName.column)))
curr = result.all()

When I try to reflect the table and column objects from the MetaData in order to query them, this doesn't work. There are lots of AttributeError: 'Table' object has no attribute 'func' or TypeError: 'Table' object is not callableerrors.


def retrieve_table_obj(table):
    meta = MetaData()
    meta.reflect(bind=sync_engine)
    return meta.tables[table]

def retrieve_table_cols(self, table):
    table = retrieve_table_obj('users')
    return table.columns.keys()

async def reading(collection, modifications):

    table = db.retrieve_table_obj(collection)
    columns = db.retrieve_table_cols(collection)
    for c in columns:
        for f in mods['fields']:
            if c in f:
                q = select(func.sum(table.c))

result = await s.execute(q)
curr = result.all()

asyncio.run(reading("users", {'fields': ["usage", "allowance"]}))

How can I query tables and columns in the database when they first have to be explicitly retrieved?

CodePudding user response:

The automap extension can be used to automatically reflect database tables to SQLAlchemy models. However automap uses inspect on the engine, and this isn't supported on async engines; we can work around this by doing the automapping using a synchronous engine. Once the models have been mapped they can be used by the async engine.

For example:

import asyncio

import sqlalchemy as sa
from sqlalchemy import orm
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.ext.automap import automap_base


sync_engine = sa.create_engine('postgresql:///test', echo=True, future=True)

Base = automap_base()
Base.prepare(sync_engine, reflect=True)


async def async_main(collection, modifications):
    engine = create_async_engine(
        "postgresql asyncpg:///test",
        echo=True,
        future=True,
        connect_args={'ssl': False},
    )

    async_session = orm.sessionmaker(
        engine, class_=AsyncSession, future=True
    )

    async with async_session() as session:
        model = Base.classes[collection]
        matches = set(model.__mapper__.columns.keys()) & set(modifications['fields'])
        for m in matches:
            q = sa.select(sa.func.sum(getattr(model, m)))


            result = await session.execute(q)
            curr = result.all()
            for row in curr:
                print(row)
            print()

    # for AsyncEngine created in function scope, close and
    # clean-up pooled connections
    await engine.dispose()


asyncio.run(reading("users", {'fields': ["usage", "allowance"]}))

If you don't need models, caching the MetaData object rather than recreating it on every call to retrieve_table_obj would make the existing code more efficient, and replacing select(func.sum(table.c)) with select(sa.func.sum(getattr(table.c, c)))

  • Related