I am trying to fetch when the table (Delta table) was last optimized using below code and the getting the output as expected. This code will for all the tables which are present in the database.
table_name_or_path = "abcd"
df = spark.sql("desc history {}".format(table_name_or_path)).select("operation","timestamp").filter("operation == 'OPTIMIZE'").orderBy(col("timestamp").desc())
if len(df.take(1)) != 0:
last_optimize = df.select(col("timestamp").cast("string").alias("timestamp")).first().asDict()
print(last_optimize["timestamp"])
last_optimize = last_optimize["timestamp"]
else:
last_optimize = ""
The above code will take some time and it will trigger lot of spark jobs.
I want to optimized the above code to get the better performance.
Is there any way to write the optimized code and that will be more helpful.
CodePudding user response:
In general, it often helps when you cache the data frame before starting any calculation on it
df = spark.sql("desc history {}".format(table_name_or_path)).select("operation","timestamp").filter("operation == 'OPTIMIZE'").orderBy(col("timestamp").desc()).cache()
I would assume the here caching the orderBy step would already decrease the computational effort
CodePudding user response:
It's better to avoid checks like if len(df.take(1)) != 0
, because it may lead to recalculation of results when you do .first()
later. Instead, just limit number of rows using .limit(1)
, and check result of the collect item. Something like this (not tested):
table_name_or_path = "abcd"
df = spark.sql(f"desc history {table_name_or_path}") \
.select("operation","timestamp") \
.filter("operation == 'OPTIMIZE'").orderBy(col("timestamp").desc()) \
.limit(1)
data = df.collect()
if len(data) > 0:
last_optimize = data[0].asDict()
print(last_optimize["timestamp"])
last_optimize = last_optimize["timestamp"]
else:
last_optimize = ""