from attrs import define
from sqlalchemy.orm import (
registry,
)
from sqlalchemy.sql import (
schema,
sqltypes,
)
@define(slots=False)
class Cat():
id: int
name: str
mapper_registry = registry()
cat_table = schema.Table(
"cat",
mapper_registry.metadata,
schema.Column("id", sqltypes.Integer, primary_key=True),
schema.Column("name", sqltypes.String, nullable=False),
)
mapper_registry.map_imperatively(models.Cat, cat_table)
async def main() -> None:
...
model = Cat(1, "Meow")
async with session_maker() as session:
await session.add(model) # Ok
await session.commit()
result = await session.scalar(cat_table.select())
print(result) # 1
print(type(result)) # int
session.scalar
returns int
, not Cat
It looks like it worked with query: https://github.com/cosmicpython/code/blob/master/src/allocation/adapters/repository.py#L48
I tried to get the object through scalars
, but I need to construct it myself
Am I doing something wrong or is it supposed to be like this?
CodePudding user response:
I think you want:
result = (await session.scalar(select(Cat)))
# OR
result = (await session.scalars(select(Cat))).first()
Some other examples:
async def main() -> None:
async_engine = create_async_engine(f"postgresql asyncpg://{username}:{password}@/{db}", echo=False)
async with async_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async_session = sessionmaker(async_engine, expire_on_commit=False, class_=AsyncSession)
async with async_session() as session:
cat_id = 1
cat_name = "Meow"
model = Cat(cat_id, cat_name)
session.add(model) # Ok
await session.commit()
def check(res, exp):
assert res == exp
print (res)
result = await session.scalar(cat_table.select())
check(result, 1)
result = (await session.execute(cat_table.select())).first()
check(result, (cat_id, cat_name))
result = (await session.execute(select(cat_table))).first()
check(result, (cat_id, cat_name))
result = (await session.scalars(select(Cat))).first()
check(result, model)
result = (await session.scalar(select(Cat)))
check(result, model)
result = (await session.execute(select(Cat))).scalars().first()
check(result, model)
result = (await session.execute(select(Cat))).first()
check(result, (model,))
result = (await session.execute(select(Cat))).all()
check(result, [(model,)])
result = (await session.scalars(select(Cat))).all()
check(result, [model])