I am copying tables with data from one postgres to other postgres using SQLAlchemy and python.
I looked into below answers and created a script
Copy one database to another using SQLAlchemy
from sqlalchemy import create_engine, MetaData, event
from sqlalchemy.sql import sqltypes
import traceback
src_engine = create_engine("postgresql://user1:mypass@myip1:2025/mydb?options=-c search_path=public")
src_metadata = MetaData(bind=src_engine)
tgt_engine = create_engine("postgresql://user2:mypass@myip2:2025/newdb?options=-c search_path=public")
tgt_metadata = MetaData(bind=tgt_engine)
@event.listens_for(src_metadata, "column_reflect")
def genericize_datatypes(inspector, tablename, column_dict):
column_dict["type"] = column_dict["type"].as_generic(allow_nulltype=True)
tgt_conn = tgt_engine.connect()
tgt_metadata.reflect()
src_metadata.reflect()
for table in src_metadata.sorted_tables:
table.create(bind=tgt_engine)
# refresh metadata before you can copy data
tgt_metadata.clear()
tgt_metadata.reflect()
# # Copy all data from src to target
for table in tgt_metadata.sorted_tables:
src_table = src_metadata.tables[table.name]
stmt = table.insert()
temp_list = []
source_table_count = src_engine.connect().execute(f"select count(*) from {table.name}").fetchall()
current_row_count = 0
for index, row in enumerate(src_table.select().execute()):
temp_list.append(row._asdict())
if len(temp_list) == 2500:
stmt.execute(temp_list)
current_row_count = 2500
print(f"table = {table.name}, inserted {current_row_count} out of {source_table_count[0][0]}")
temp_list = []
if len(temp_list) > 0:
stmt.execute(temp_list)
current_row_count = len(temp_list)
print(f"table = {table.name}, inserted {current_row_count} out of {source_table_count[0][0]}")
print(f'source table "{table.name}": {source_table_count}')
print(f'target table "{table.name}": {tgt_engine.connect().execute(f"select count(*) from {table.name}").fetchall()}')
Sequence copied from source db to target db have below script
CREATE SEQUENCE public.my_table_request_id_seq
INCREMENT 1
START 1
MINVALUE 1
MAXVALUE 2147483647
CACHE 1;
ALTER SEQUENCE public.my_table_request_id_seq
OWNER TO my_user;
Everything is working fine, table schema is copied, data is copied but the auto increment column of each table starts with 1 instead of last incremented value, due to which i am getting an error duplicate key value violates unique constraint
while inserting new data.
I do I solve this issue.
Thanks in advance
CodePudding user response:
Probably this is due to the fact that you are directly inserting in the target column without using the insert.
e.g. in the source database you have a table like the following with the column id
being the incremental one
------ ------------
| id | name |
------ ------------
| 1 | test |
| 2 | test2 |
------ ------------
When this is copied to the target database, it's directly inserted as
INSERT INTO TBL_NAME VALUES (1, 'test');
INSERT INTO TBL_NAME VALUES (2, 'test2');
without using the sequence.
Possibly a fix would be, once finished the copy, to alter the sequence and set it to the max id
number found in the source database.
CodePudding user response:
I have made the working changes for sequence issue.Below is the full code
from sqlalchemy import create_engine, MetaData, event
from sqlalchemy.sql import sqltypes
import traceback
src_engine = create_engine("postgresql://user1:mypass@myip1:2025/mydb?options=-c search_path=public")
src_metadata = MetaData(bind=src_engine)
tgt_engine = create_engine("postgresql://user2:mypass@myip2:2025/newdb?options=-c search_path=public")
tgt_metadata = MetaData(bind=tgt_engine)
@event.listens_for(src_metadata, "column_reflect")
def genericize_datatypes(inspector, tablename, column_dict):
column_dict["type"] = column_dict["type"].as_generic(allow_nulltype=True)
tgt_metadata.reflect()
src_metadata.reflect()
for table in src_metadata.sorted_tables:
table.create(bind=tgt_engine)
# refresh metadata before you can copy data
tgt_metadata.clear()
tgt_metadata.reflect()
# Copy all data from src to target
for table in tgt_metadata.sorted_tables:
src_table = src_metadata.tables[table.name]
stmt = table.insert()
temp_list = []
with src_engine.connect() as src_conn:
source_table_count = src_conn.execute(f"select count(*) from {table.name}").fetchall()
current_row_count = 0
for index, row in enumerate(src_table.select().execute()):
temp_list.append(row._asdict())
if len(temp_list) == 2500:
stmt.execute(temp_list)
current_row_count = 2500
print(f"table = {table.name}, inserted {current_row_count} out of {source_table_count[0][0]}")
temp_list = []
if len(temp_list) > 0:
stmt.execute(temp_list)
current_row_count = len(temp_list)
print(f"table = {table.name}, inserted {current_row_count} out of {source_table_count[0][0]}")
###################### code for sequence restart: start ##############################
with tgt_engine.connect().execution_options(autocommit=True) as tgt_conn:
# getting sequence list
sequence_list_data = tgt_conn.execute(f"SELECT c.relname FROM pg_class c WHERE c.relkind = 'S';").fetchall()
sequence_list = [sequence[0] for sequence in sequence_list_data]
sequence_to_alter = ""
for sequence in sequence_list:
if sequence.startswith(table.name):
# getting sequence name for the given table
sequence_to_alter = sequence
break
if sequence_to_alter:
# getting column name from sequence name, usually sequence nameis: {tableName}_{columnName}_{seq}, example course_student_id_seq
new = sequence_to_alter.replace(f"{table.name}_", "")
sequence_column_name = new.replace("_seq", "")
# getting last generated sequence id for the table
last_id = tgt_conn.execute(f"select max({sequence_column_name}) from {table.name}").fetchall()[0][0]
# restarting the sequence id with last_id 1, so that sequence begins with last generated id 1
alter_sequence_query = f"ALTER SEQUENCE {sequence_to_alter} RESTART WITH {last_id 1};"
tgt_conn.execute(alter_sequence_query)
# setting the owner for the sequence to owner needed to be
tgt_conn.execute(f"ALTER SEQUENCE {sequence_to_alter} OWNER TO my_owner_name")
###################### code for sequence restart: end ##############################
print(f'source table "{table.name}": {source_table_count}')
with tgt_engine.connect() as tgt_conn:
print(f'target table "{table.name}": {tgt_conn.execute(f"select count(*) from {table.name}").fetchall()}')
In case of any query or correction, please do let me know.
Hope this helps someone!