I am trying to run a simple ETL process using AWS Glue.
The process is simple: use a JDBC connector to read from 20 tables from a Database, and then sink them in S3. Everything works fine, the only issue is the amount of time it is required to run the job (2 hours).
The main bottleneck is caused by some very large tables (16 to 20 millions records), and by the fact that I have to extract number of rows and fields list. The glue job uses Python 3, Spark 3, 2 workers (of which 1 driver).
I first read the table:
df = sparkSession.read.format("jdbc").option("url", connection_url).option("dbtable", table).option("driver", DRIVER).load()
Then I convert it to a GlueDynamicFrame (as it is easier for me to run operations on it):
df = DynamicFrame.fromDF(df, glueContext, "df")
Then I proceed to calculate number of rows:
n_rows = df.count()
Which starts the pain: for some tables (the biggest ones) it takes 10 to 20 minutes to just return this value. I have researched and (I think) understand the concept of lazy-evaluations and computations in Spark, but it seems to me that this operation should take way less anyway and I am surely doing something wrong. Anyway, then I proceed to generate a field list:
fields = [df.schema().fields[x].name for x in range(0, len(df.schema().fields))]
Which again, 10 to 20 minutes to run. Eventually, I sink the dataframe:
glueContext.write_dynamic_frame.\
from_options(frame = df,
connection_type = "s3",
connection_options = {"path": path,
"partitionKeys": [partition]},
format = "parquet")
Which again, it takes a long time for these large tables.
It is worth mentioning that I extract from db tables that contain few rows as well. I mention this as I have read as possible solution to repartition as soon as I read the table, but it would make zero sense to repartition a DataFrame of 3 rows.
The only way of doing it systematically would be to count rows first, and then base on n_rows repartition, but it takes already forever.Also, I have read that the number of partitions should be somewhat related to the number of workers. I have 1 worker, so 1 partition seems logical to me.
My question would be: what am I doing wrong? Should I just increase number of workers and repartition accordingly at the moment of reading? Or what other solutions are available? Thanks a lot for any advice!
CodePudding user response:
We have faced this same challenge in our one of migration process and optimized based on the following optimization method.
Optimization : 01
as like you have mentioned n_rows = df.count()
is costly operation and try to avoid that process from your code .
Optimization : 02 [generate a field list]
We have tried to get the schema from source by sample record 1 .
src_connect_string = {'url':"jdbc:teradata://conntionstring,TMODE=TERA", 'user' : "username", 'password' : "mypassword",'query':"select * from tablename limit 1 ",'driver' :"com.teradata.jdbc.TeraDriver"}
df_td_src=spark.read.format("jdbc").options(**src_connect_string).load()
src_td_columns=df_td_src.schema.names
Optimization : 03
Find a location where exactly taking longer time either read process or write process . Based on this , we can make that process in concurrency run . example , since our writing process was taking longer time , we make writing process in concurrent manner. sample code for your ref .
jdbcurl = f"jdbc:teradata://{server}/database={db}, TMODE=TERA"
driver = "com.teradata.jdbc.TeraDriver"
query = query.replace("startdt", "'" start_date "'").replace("enddt", "'" end_date "'")
print(f"Query - {query}")
data_df = spark.read \
.format('jdbc') \
.options(url= jdbcurl, user= user,password= pw, query=query, driver= driver,numPartitions=100) \
.option('customSchema', schema[1:-1]) \
.option('ConnectionRetries', '3') \
.option('ConnectionRetryInterval', '2000') \
.option("fetchSize",100000) \
.load()
# display(data_df)
from pyspark.sql.functions import *
from datetime import timedelta, date,datetime
from concurrent import futures
from pyspark.sql import functions as F
from pyspark.sql import functions as f
from pyspark.sql.functions import col
date_range = ['2017-01-28']
def writeS3(curr_date):
print(f"Starting S3 write for date - {curr_date}")
data_df1 = data_df.withColumn("date1", f.from_unixtime(f.unix_timestamp(data_df.LD_TS), "yyyy-MM-dd"))
display(data_df1)
print(curr_date)
save_df = data_df1.filter(f"date1='{curr_date}'").drop('date1')
save_df.write.parquet(f"s3://location")
jobs = []
results_done = []
total_days = 30
with futures.ThreadPoolExecutor(max_workers=total_days 1) as e:
print(f"{raw_bucket}/{db}/{table}/")
for curr_date in date_range:
print(f"Starting S3 write for date - {curr_date}")
jobs.append(e.submit(writeS3, curr_date))
# result_done = job.result()
# print(f"Job Completed - {result_done}")
print("Task complete")
CodePudding user response:
I would definitely increase the amount of workers when working with 16-20 Million records. You really want to leverage the parallel processing power of Spark.
Also .count()
is an action that will force Spark to execute the plan. If you want to continue to work with that DataFrame, you should can use .cache()
to increase performance.
CodePudding user response:
I believe you are not utilizing the parallel JDBC read mechanism which is controlled by numPartitions
option
You have to arrive at an optimal numPartitions
number
- Based on allocated Executor cores, one Executor core executes one partition.
- Data partitions that will be executed parallel in an Executor should fit in memory fully to avoid spillage.
df = spark.read. \ format("jdbc"). \ option("url", "URL"). \ option("user", "<username>"). \ option("password", "<password>"). \ option("dbtable", "<table>"). \ option("partitionColumn", "partitionColumn"). \ option("lowerBound", "<lowest partition number>"). \ option("upperBound", "<largest partition number>"). \ option("numPartitions", "<number of partitions>"). \ load()