Home > database >  How to extract data from Oracle database with AWS Glue and other AWS services
How to extract data from Oracle database with AWS Glue and other AWS services

Time:09-28

I am new to AWS glue and other AWS stuff. I have a requirement to build an ETL framework for a project. This is the high-level diagram. I want to understand, instead of creating 400 glue pipelines, can I create a template kind of a thing which is driven by reference data from a postgres aurora/mysql. I am familiar with Python. Anyone has any ideas on this? Any references, code examples.

enter image description here

CodePudding user response:

  1. We had a config master table in our mysql db. The columns per convenience we had source_table_name as the identifier to fetch appropriate table column names/queries for CREATING STG TABLE, LOAD DATA INTO STG TABLE, INSERT/UPDATE INTO TARGET TABLEs etc.
  2. We have also split the INSERT/UPDATE into two different columns in config master, since we were using ON DUPLICATE KEY to update existing records.
  3. get the source table name, by processing the lambda events which will have landing file name.
  4. Fetch all required data from the config master for the source table name. It would be something like following:
sql_query = "SELECT * FROM {0}.CONFIG_MASTER WHERE src_tbl_name = %s ".format(mydb)
cur.execute(sql_query, (source_fname))
result = cur.fetchall()
for row in result:
stg_table_name = row[1]
tgt_table_name = row[2]
create_stg_table_qry = row[3]
load_data_stg_table_qry = row[4]
insert_tgt_table_qry = row[5]
insert_tgt_table_qry_part_1 = row[6]
insert_tgt_table_qry_part_2 = row[7]
conn.commit()
cur.close()

Pass appropriate parameters to the generic functions as below:

create_stg_table(stg_table_name, create_stg_table_qry, load_data_stg_table_qry)
loaddata(tgt_table_name, insert_tgt_table_qry_part_1, insert_tgt_table_qry_part_2, stg_table_name)

The generic functions would be something like below, this is for aurora RDS, please make changes as needed.

def create_stg_table(stg_table_name, create_stg_table_qry, load_data_stg_table_qry):
cur, conn = connect()
createStgTable1 = "DROP TABLE IF EXISTS {0}.{1}".format(mydb, stg_table_name)
createStgTable2 = "CREATE TABLE {0}.{1} {2}".format(mydb, stg_table_name, create_stg_table_qry)
loadQry = "LOAD DATA FROM S3 PREFIX 's3://' REPLACE INTO TABLE ...".format()
cur.execute(createStgTable1)
cur.execute(createStgTable2)
cur.execute(loadQry)
conn.commit()
conn.close()

def loaddata(tgt_table_name, insert_tgt_table_qry_part_1, insert_tgt_table_qry_part_2, stg_table_name):
cur, conn = connect()
insertQry = "INSERT INTO target table, from the staging table query here"
print(insertQry)
cur.execute(insertQry)
conn.commit()
conn.close()

Hope this gives an idea.

Thanks

  • Related