I have python program written in pyspark environment. There are various lines of pyspark transformation written, but they take hardly 45 seconds to execute. But the final data frame that is supposed to write the rows(data) to a target location in parquet file format takes around 5 minutes and 35 secs. Below are the rows numbers
No of records in the file: 7143779
Below is the code snippet that is writing into a parquet format
final_df = func.union_dataframes([df1.select(<cols>), df2.select(cols)])
cur_time_str = func.get_current_timestamp_for_curate_container()
if time_frame == "20000":
cur_path = <destination_path> curate_time_str "abc"
else:
cur_path = <destination_path> cur_time_str "_" time_frame "_xyz"
func.write_df_as_parquet_file(final_df, cur_path, logger)
Below is the code snippet that we are calling to write the parquet file
def write_df_as_parquet_file(df, path, logger):
try:
df.write.mode('overwrite').parquet(path)
logger.debug(
f'File written Successfully at {path} , No of records in the file : { str(df.count())}')
print(
f'File written Successfully at {path} , No of records in the file : { str(df.count())}')
except Exception as exc:
return_code = 'file Writting Exception: ' \
path '\n' 'Exception : ' str(exc)
print(return_code)
logger.error(return_code)
raise
Is there a way to decrease time taken for this flow that I can implement in the above function, or any other way?
Thank you.
CodePudding user response:
when you call df.count()
in the write_df_as_parquet_file
pyspark actually calculates the df again, you didn't cache the result.
You possibly can reduce the run time by adding the following line that reads the parquet after saving it:
df.write.mode('overwrite').parquet(path)
df = spark.read.parquet(path)
logger.debug(
f'File written Successfully at {path} , No of records in the file : { str(df.count())}')
print(
f'File written Successfully at {path} , No of records in the file : { str(df.count())}')
CodePudding user response:
You report I/O throughput of 21 K record / sec, and are looking for higher throughput.
Verify that that is feasible.
You didn't mention how many bytes are transferred
(per record or total),
so we don't know how many byte / sec the I/O
subsystem is handling.
Use dd
to copy your file, and compare
its throughput to your app's throughput.
Verify that your app is I/O-bound, just like dd
is.
At this point in your analysis, you may have concluded that to reduce elapsed time you need to write fewer bytes.
Good news! Parquet offers several lossless compression algorithms. Downside is that it might make you CPU-bound. Also, reading becomes more expensive, and you might read many more times than you write. Try snappy or its competitors, and determine which compression approach best fits your use case. Report your findings here.