I'm new to Spark and it seems like I'm figuring out the right way to iterate over a dataframe multiple times.
I'm trying to loop over a spark dataframe 10 times, each time for a different date to get the matching results, but with time the process takes longer and longer. I tried to use unpersist() but it didn't help.
Hope someone can help me.
import findspark
findspark.init()
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from itertools import combinations
import datetime
spark = SparkSession.builder.appName("Practice").master("local[*]").config("spark.executor.memory", "70g").config("spark.driver.memory", "50g").config("spark.memory.offHeap.enabled",True).config("spark.memory.offHeap.size","16g").getOrCreate()
df = spark.read.parquet('spark-big-data\parquet_small_example.parquet')
res =[]
for date in range(10):
df = df.withColumn('fs_origin',df.request.Segments.getItem(0)['Origin'])
df = df.withColumn('fs_destination',df.request.Segments.getItem(0)['Destination'])
df = df.withColumn('fs_date',df.request.Segments.getItem(0)['FlightTime'])
df = df.withColumn('ss_origin',df.request.Segments.getItem(1)['Origin'])
df = df.withColumn('ss_destination',df.request.Segments.getItem(1)['Destination'])
df = df.withColumn('ss_date',df.request.Segments.getItem(1)['FlightTime'])
df = df.withColumn('full_date',F.concat_ws('-', df.year,df.month,df.day))
df = df.filter( (df["fs_origin"] == 'TLV') & (df["fs_destination"] == 'NYC') & (df["ss_origin"] == 'NYC') & (df['ss_destination']=='TLV') & (df['fs_date']=='2021-02-' str(date) 'T00:00:00') & (df['ss_date']=='2021-02-16' 'T00:00:00'))
if df.count()==0:
res.append(0)
else:
df = df.sort(F.unix_timestamp("full_date", "yyyy-M-d").desc())
latest_day = df.collect()[0]['full_date']
df = df.filter(df['full_date']==latest_day)
df = df.withColumn("exploded_data", F.explode("response.results"))
df = df.withColumn(
"price",
F.col("exploded_data").getItem('PriceInfo').getItem('Price') # either get by name or by index e.g. getItem(0) etc
)
res.append(df.sort(df.price.asc()).collect()[0]['price'])
df.unpersist()
spark.catalog.clearCache()
CodePudding user response:
Can you try this:-
import findspark
findspark.init()
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from itertools import combinations
import datetime
spark = SparkSession.builder.appName("Practice").master("local[*]").config("spark.executor.memory", "70g").config("spark.driver.memory", "50g").config("spark.memory.offHeap.enabled",True).config("spark.memory.offHeap.size","16g").getOrCreate()
df = spark.read.parquet('spark-big-data\parquet_small_example.parquet')
df = df.withColumn('fs_origin',df.request.Segments.getItem(0)['Origin'])
df = df.withColumn('fs_destination',df.request.Segments.getItem(0)['Destination'])
df = df.withColumn('fs_date',df.request.Segments.getItem(0)['FlightTime'])
df = df.withColumn('ss_origin',df.request.Segments.getItem(1)['Origin'])
df = df.withColumn('ss_destination',df.request.Segments.getItem(1)['Destination'])
df = df.withColumn('ss_date',df.request.Segments.getItem(1)['FlightTime'])
df = df.withColumn('full_date',F.concat_ws('-', df.year,df.month,df.day))
df = df.filter((df["fs_origin"] == 'TLV') & (df["fs_destination"] == 'NYC') & (df["ss_origin"] == 'NYC') & (df['ss_destination']=='TLV')).persist()
df.count()
res =[]
for date in range(10):
df_date = df.filter((df['fs_date']=='2021-02-' str(date) 'T00:00:00') & (df['ss_date']=='2021-02-16' 'T00:00:00'))
if df_date.count()==0:
res.append(0)
else:
df_date = df_date.sort(F.unix_timestamp("full_date", "yyyy-M-d").desc())
latest_day = df_date.collect()[0]['full_date']
df_date = df_date.filter(df_date['full_date']==latest_day)
df_date = df_date.withColumn("exploded_data", F.explode("response.results"))
df_date = df_date.withColumn(
"price",
F.col("exploded_data").getItem('PriceInfo').getItem('Price') # either get by name or by index e.g. getItem(0) etc
)
res.append(df_date.sort(df_date.price.asc()).collect()[0]['price'])
df.unpersist()
spark.catalog.clearCache()