I am new to AWS glue and other AWS stuff. I have a requirement to build an ETL framework for a project. This is the high-level diagram. I want to understand, instead of creating 400 glue pipelines, can I create a template kind of a thing which is driven by reference data from a postgres aurora/mysql. I am familiar with Python. Anyone has any ideas on this? Any references, code examples.
CodePudding user response:
- We had a config master table in our mysql db. The columns per convenience we had source_table_name as the identifier to fetch appropriate table column names/queries for CREATING STG TABLE, LOAD DATA INTO STG TABLE, INSERT/UPDATE INTO TARGET TABLEs etc.
- We have also split the INSERT/UPDATE into two different columns in config master, since we were using ON DUPLICATE KEY to update existing records.
- get the source table name, by processing the lambda events which will have landing file name.
- Fetch all required data from the config master for the source table name. It would be something like following:
sql_query = "SELECT * FROM {0}.CONFIG_MASTER WHERE src_tbl_name = %s ".format(mydb)
cur.execute(sql_query, (source_fname))
result = cur.fetchall()
for row in result:
stg_table_name = row[1]
tgt_table_name = row[2]
create_stg_table_qry = row[3]
load_data_stg_table_qry = row[4]
insert_tgt_table_qry = row[5]
insert_tgt_table_qry_part_1 = row[6]
insert_tgt_table_qry_part_2 = row[7]
conn.commit()
cur.close()
Pass appropriate parameters to the generic functions as below:
create_stg_table(stg_table_name, create_stg_table_qry, load_data_stg_table_qry)
loaddata(tgt_table_name, insert_tgt_table_qry_part_1, insert_tgt_table_qry_part_2, stg_table_name)
The generic functions would be something like below, this is for aurora RDS, please make changes as needed.
def create_stg_table(stg_table_name, create_stg_table_qry, load_data_stg_table_qry):
cur, conn = connect()
createStgTable1 = "DROP TABLE IF EXISTS {0}.{1}".format(mydb, stg_table_name)
createStgTable2 = "CREATE TABLE {0}.{1} {2}".format(mydb, stg_table_name, create_stg_table_qry)
loadQry = "LOAD DATA FROM S3 PREFIX 's3://' REPLACE INTO TABLE ...".format()
cur.execute(createStgTable1)
cur.execute(createStgTable2)
cur.execute(loadQry)
conn.commit()
conn.close()
def loaddata(tgt_table_name, insert_tgt_table_qry_part_1, insert_tgt_table_qry_part_2, stg_table_name):
cur, conn = connect()
insertQry = "INSERT INTO target table, from the staging table query here"
print(insertQry)
cur.execute(insertQry)
conn.commit()
conn.close()
Hope this gives an idea.
Thanks