Why can't I raw insert a list of dicts with SQLalchemy ?
import os
import sqlalchemy
import pandas as pd
def connect_unix_socket() -> sqlalchemy.engine:
db_user = os.environ["DB_USER"]
db_pass = os.environ["DB_PASS"]
db_name = os.environ["DB_NAME"]
unix_socket_path = os.environ["INSTANCE_UNIX_SOCKET"]
return sqlalchemy.create_engine(
sqlalchemy.engine.url.URL.create(
drivername="postgresql pg8000",
username=db_user,
password=db_pass,
database=db_name,
query={"unix_sock": f"{unix_socket_path}/.s.PGSQL.5432"},
)
)
def _insert_ecoproduct(df: pd.DataFrame) -> None:
db = connect_unix_socket()
db_matching = {
'gtin': 'ecoproduct_id',
'ITEM_NAME_AS_IN_MARKETPLACE' : 'ecoproductname',
'ITEM_WEIGHT_WITH_PACKAGE_KG' : 'ecoproductweight',
'ITEM_HEIGHT_CM' : 'ecoproductlength',
'ITEM_WIDTH_CM' : 'ecoproductwidth',
'test_gtin' : 'gtin_test',
'batteryembedded' : 'batteryembedded'
}
df = df[db_matching.keys()]
df.rename(columns=db_matching, inplace=True)
data = df.to_dict(orient='records')
sql_query = """INSERT INTO ecoproducts(
ecoproduct_id,
ecoproductname,
ecoproductweight,
ecoproductlength,
ecoproductwidth,
gtin_test,
batteryembedded)
VALUES (%(ecoproduct_id)s, %(ecoproductname)s,%(ecoproductweight)s,%(ecoproductlength)s,
%(ecoproductwidth)s,%(gtin_test)s,%(batteryembedded)s)
ON CONFLICT(ecoproduct_id) DO NOTHING;"""
with db.connect() as conn:
result = conn.exec_driver_sql(sql_query, data)
print(f"{result.rowcount} new rows were inserted.")
Is it possible to map parameters with th dialect pg8000 ? Or maybe I should use psycopg2 ?
What is the problem here ?
EDIT 1: see variable data details :
print(data)
print(type(data))
[{'ecoproduct_id': '6941487202157', 'ecoproductname': 'HUAWEI FreeBuds Pro Bluetooth sans Fil ', 'ecoproductweight': '4', 'ecoproductlength': '0.220', 'ecoproductwidth': '0.99', 'gtin_test': False, 'batteryembedded': 0}]
<class 'list'>
CodePudding user response:
Is it possible to map [named] parameters with th (sic) dialect pg8000 ?
Yes. Using a SQLAlchemy text() object allows us to use named parameters even if the underlying DBAPI does not.
import sqlalchemy as sa
sql_query = """\
INSERT INTO ecoproducts(
ecoproduct_id,
ecoproductname,
ecoproductweight,
ecoproductlength,
ecoproductwidth,
gtin_test,
batteryembedded)
VALUES (
:ecoproduct_id,
:ecoproductname,
:ecoproductweight,
:ecoproductlength,
:ecoproductwidth,
:gtin_test,
:batteryembedded)
ON CONFLICT(ecoproduct_id) DO NOTHING;
"""
result = conn.execute(sa.text(sql_query), data)