Home > database >  loop through multiple tables from source to s3 using glue (Python/Pyspark) through configuration fil
loop through multiple tables from source to s3 using glue (Python/Pyspark) through configuration fil

Time:10-30

I am looking ingest multiple tables from a relational database to s3 using glue. The table details are present in a configuration file. The configuration file is a json file. Would be helpful to have a code that can loop through multiple table names and ingests these tables into s3. The glue script is written in python (pyspark)

this is sample how the configuration file looks :

{"main_key":{
      "source_type": "rdbms", 
      "source_schema": "DATABASE", 
      "source_table": "DATABASE.Table_1", 
}}

CodePudding user response:

Just write a normal for loop to loop through your DB configuration then follow Spark JDBC documentation to connect to each of them in sequence.

CodePudding user response:

Assuming your Glue job can connect to the database and a Glue Connection has been added to it. Here's a sample extracted from my script that does something similar, you would need to update the jdbc url format that works for your database, this one uses sql server, implementation details for fetching the config file, looping through items, etc.

from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext

from datetime import datetime

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

jdbc_url = f"jdbc:sqlserver://{hostname}:{port};databaseName={db_name}"
connection_details = {
    "user": 'db_user',
    "password": 'db_password',
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver",
}

tables_config = get_tables_config_from_s3_as_dict()
date_partition = datetime.today().strftime('%Y%m%d')
write_date_partition = f'year={date_partition[0:4]}/month={date_partition[4:6]}/day={date_partition[6:8]}'

for key, value in tables_config.items():
    table = value['source_table']

    df = spark.read.jdbc(url=jdbc_url, table=table, properties=connection_details)
    write_path = f's3a://bucket-name/{table}/{write_date_partition}'
    df.write.parquet(write_path)
  • Related