I'm trying to create a gold table notebook in Databricks, however it would take 9 days to fully reprocess the historical data (43GB, 35k parquet files). I tried scaling up the cluster but it doesn't go above 5000 records/second. The bottleneck seems to be the applyInPandas()
function. I'm wondering if I could replace pandas with anything else to make the gold notebook execute faster.
Silver table has 60 columns (read_id
, reader_id
, tracker_timestamp
, event_type
, ebook_id
, page_id
, agent_ip
, agent_device_type
, ...). Each row of data represents read event of an ebook. E.g 'page turn', 'click on image', 'click on link',... All of the events that have occurred in the single session have the same read.id
. In the gold table I'm trying to group those events in sessions and calculate the number of times each event has occurred in the single session. So instead of 100 rows of data for a read session in silver table I would end up just with a single aggregated row in gold table.
Input is the silver delta table:
import pyspark.sql.functions as F
import pyspark.sql.types as T
import pandas as pd
from pyspark.sql.functions import pandas_udf
input = (spark
.readStream
.format("delta")
.option("withEventTimeOrder", "true")
.option("maxFilesPerTrigger", 100)
.load(f"path_to_silver_bucket")
)
I use withWatermark
and session_window
functions to ensure I end up grouping all of the events from the single read session. (read session automatically ends 30 minutes after the last reader activity)
group = input.withWatermark("tracker_timestamp", "10 minutes").groupBy("read_id", F.session_window(input.tracker_timestamp, "30 minutes"))
In the next step I use the applyInPandas
function like so:
sessions = group.applyInPandas(processing_function, schema=processing_function_output_schema)
Definition of the processing_function
used in applyInPandas
:
def processing_function(df):
surf_time_ms = df.query('event_type == "surf"')['duration'].sum()
immerse_time_ms = df.query('event_type == "immersion"')['duration'].sum()
min_timestamp = df['tracker_timestamp'].min()
max_timestamp = df['tracker_timestamp'].max()
shares = len(df.query('event_type == "share"'))
leads = len(df.query('event_type == "lead_store"'))
is_read = len(df.query('event_type == "surf"')) > 0
distinct_pages = df['page_id'].nunique()
data = {
"read_id": df['read_id'].values[0],
"surf_time_ms": surf_time_ms,
"immerse_time_ms": immerse_time_ms,
"min_timestamp": min_timestamp,
"max_timestamp": max_timestamp,
"shares": shares,
"leads": leads,
"is_read": is_read,
"number_of_events": len(df),
"distinct_pages": distinct_pages
}
for field in not_calculated_string_fields:
data[field] = df[field].values[0]
new_df = pd.DataFrame(data=data, index=['read_id'])
for x in all_events:
new_df[f"count_{x}"] = df.query(f"type == '{x}'").count()
for x in duration_events:
duration = df.query(f"event_type == '{x}'")['duration']
duration_sum = duration.sum()
new_df[f"duration_{x}_ms"] = duration_sum
if duration_sum > 0:
new_df[f"mean_duration_{x}_ms"] = duration.mean()
else:
new_df[f"mean_duration_{x}_ms"] = 0
return new_df
And finally, I'm writing the calculated row to the gold table like so:
for_partitioning = (sessions
.withColumn("tenant", F.col("story_tenant"))
.withColumn("year", F.year(F.col("min_timestamp")))
.withColumn("month", F.month(F.col("min_timestamp"))))
checkpoint_path = "checkpoint-path"
gold_path = f"gold-bucket"
(for_partitioning
.writeStream
.format('delta')
.partitionBy('year', 'month', 'tenant')
.option("mergeSchema", "true")
.option("checkpointLocation", checkpoint_path)
.outputMode("append")
.start(gold_path))
Can anybody think of a more efficient way to do a UDF in PySpark than applyInPandas
for the above example? I simply cannot afford to wait 9 days to reprocess 43GB of data...
I've tried playing around with different input and output options (e.g. .option("maxFilesPerTrigger", 100)
) but the real problem seems to be applyInPandas
.
CodePudding user response:
You could rewrite your processing_function
into native Spark if you really wanted.
"read_id": df['read_id'].values[0]
F.first('read_id').alias('read_id')
"surf_time_ms": df.query('event_type == "surf"')['duration'].sum()
F.sum(F.when(F.col('event_type') == 'surf', F.col('duration'))).alias('surf_time_ms')
"immerse_time_ms": df.query('event_type == "immersion"')['duration'].sum()
F.sum(F.when(F.col('event_type') == 'immersion', F.col('duration'))).alias('immerse_time_ms')
"min_timestamp": df['tracker_timestamp'].min()
F.min('tracker_timestamp').alias('min_timestamp')
"max_timestamp": df['tracker_timestamp'].max()
F.max('tracker_timestamp').alias('max_timestamp')
"shares": len(df.query('event_type == "share"'))
F.count(F.when(F.col('event_type') == 'share', F.lit(1))).alias('shares')
"leads": len(df.query('event_type == "lead_store"'))
F.count(F.when(F.col('event_type') == 'lead_store', F.lit(1))).alias('leads')
"is_read": len(df.query('event_type == "surf"')) > 0
(F.count(F.when(F.col('event_type') == 'surf', F.lit(1))) > 0).alias('is_read')
"number_of_events": len(df)
F.count(F.lit(1)).alias('number_of_events')
"distinct_pages": df['page_id'].nunique()
F.countDistinct('page_id').alias('distinct_pages')
for field in not_calculated_string_fields:
data[field] = df[field].values[0]
*[F.first(field).alias(field) for field in not_calculated_string_fields]
for x in all_events:
new_df[f"count_{x}"] = df.query(f"type == '{x}'").count()
The above can probably be skipped? As far as my tests go, new columns get NaN values, because .count()
returns a Series object instead of one simple value.
for x in duration_events:
duration = df.query(f"event_type == '{x}'")['duration']
duration_sum = duration.sum()
new_df[f"duration_{x}_ms"] = duration_sum
if duration_sum > 0:
new_df[f"mean_duration_{x}_ms"] = duration.mean()
else:
new_df[f"mean_duration_{x}_ms"] = 0
*[F.sum(F.when(F.col('event_type') == x, F.col('duration'))).alias(f"duration_{x}_ms") for x in duration_events]
*[F.mean(F.when(F.col('event_type') == x, F.col('duration'))).alias(f"mean_duration_{x}_ms") for x in duration_events]
So, instead of
def processing_function(df):
...
...
sessions = group.applyInPandas(processing_function, schema=processing_function_output_schema)
you could use efficient native Spark:
sessions = group.agg(
F.first('read_id').alias('read_id'),
F.sum(F.when(F.col('event_type') == 'surf', F.col('duration'))).alias('surf_time_ms'),
F.sum(F.when(F.col('event_type') == 'immersion', F.col('duration'))).alias('immerse_time_ms'),
F.min('tracker_timestamp').alias('min_timestamp'),
F.max('tracker_timestamp').alias('max_timestamp'),
F.count(F.when(F.col('event_type') == 'share', F.lit(1))).alias('shares'),
F.count(F.when(F.col('event_type') == 'lead_store', F.lit(1))).alias('leads'),
(F.count(F.when(F.col('event_type') == 'surf', F.lit(1))) > 0).alias('is_read'),
F.count(F.lit(1)).alias('number_of_events'),
F.countDistinct('page_id').alias('distinct_pages'),
*[F.first(field).alias(field) for field in not_calculated_string_fields],
# skipped count_{x}
*[F.sum(F.when(F.col('event_type') == x, F.col('duration'))).alias(f"duration_{x}_ms") for x in duration_events],
*[F.mean(F.when(F.col('event_type') == x, F.col('duration'))).alias(f"mean_duration_{x}_ms") for x in duration_events],
)