Home > Software engineering >  Spark dataframe inside loop taking slower each time
Spark dataframe inside loop taking slower each time

Time:07-02

I'm new to Spark and it seems like I'm figuring out the right way to iterate over a dataframe multiple times.

I'm trying to loop over a spark dataframe 10 times, each time for a different date to get the matching results, but with time the process takes longer and longer. I tried to use unpersist() but it didn't help.

Hope someone can help me.

import findspark
findspark.init()
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from itertools import combinations
import datetime

spark = SparkSession.builder.appName("Practice").master("local[*]").config("spark.executor.memory", "70g").config("spark.driver.memory", "50g").config("spark.memory.offHeap.enabled",True).config("spark.memory.offHeap.size","16g").getOrCreate()

df = spark.read.parquet('spark-big-data\parquet_small_example.parquet')

res =[]

for date in range(10):

    df = df.withColumn('fs_origin',df.request.Segments.getItem(0)['Origin'])
    df = df.withColumn('fs_destination',df.request.Segments.getItem(0)['Destination'])
    df = df.withColumn('fs_date',df.request.Segments.getItem(0)['FlightTime'])

    df = df.withColumn('ss_origin',df.request.Segments.getItem(1)['Origin'])
    df = df.withColumn('ss_destination',df.request.Segments.getItem(1)['Destination'])
    df = df.withColumn('ss_date',df.request.Segments.getItem(1)['FlightTime'])

    df = df.withColumn('full_date',F.concat_ws('-', df.year,df.month,df.day))
    df = df.filter( (df["fs_origin"] == 'TLV') & (df["fs_destination"] == 'NYC') & (df["ss_origin"] == 'NYC') & (df['ss_destination']=='TLV') & (df['fs_date']=='2021-02-' str(date) 'T00:00:00') & (df['ss_date']=='2021-02-16' 'T00:00:00'))

    if df.count()==0:
        res.append(0)
        
    else:
        df = df.sort(F.unix_timestamp("full_date", "yyyy-M-d").desc())

        latest_day = df.collect()[0]['full_date']


        df = df.filter(df['full_date']==latest_day)

        df = df.withColumn("exploded_data", F.explode("response.results"))

        df = df.withColumn(
                    "price",
                    F.col("exploded_data").getItem('PriceInfo').getItem('Price') # either get by name or by index e.g. getItem(0) etc
                )

        res.append(df.sort(df.price.asc()).collect()[0]['price'])
    
    df.unpersist()
    spark.catalog.clearCache()


CodePudding user response:

Can you try this:-

import findspark
findspark.init()
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from itertools import combinations
import datetime

spark = SparkSession.builder.appName("Practice").master("local[*]").config("spark.executor.memory", "70g").config("spark.driver.memory", "50g").config("spark.memory.offHeap.enabled",True).config("spark.memory.offHeap.size","16g").getOrCreate()

df = spark.read.parquet('spark-big-data\parquet_small_example.parquet')
df = df.withColumn('fs_origin',df.request.Segments.getItem(0)['Origin'])
df = df.withColumn('fs_destination',df.request.Segments.getItem(0)['Destination'])
df = df.withColumn('fs_date',df.request.Segments.getItem(0)['FlightTime'])
df = df.withColumn('ss_origin',df.request.Segments.getItem(1)['Origin'])
df = df.withColumn('ss_destination',df.request.Segments.getItem(1)['Destination'])
df = df.withColumn('ss_date',df.request.Segments.getItem(1)['FlightTime'])
df = df.withColumn('full_date',F.concat_ws('-', df.year,df.month,df.day))
df = df.filter((df["fs_origin"] == 'TLV') & (df["fs_destination"] == 'NYC') & (df["ss_origin"] == 'NYC') & (df['ss_destination']=='TLV')).persist()

df.count()

res =[]

for date in range(10):
    df_date = df.filter((df['fs_date']=='2021-02-' str(date) 'T00:00:00') & (df['ss_date']=='2021-02-16' 'T00:00:00'))

    if df_date.count()==0:
        res.append(0)
        
    else:
        df_date = df_date.sort(F.unix_timestamp("full_date", "yyyy-M-d").desc())
        latest_day = df_date.collect()[0]['full_date']

        df_date = df_date.filter(df_date['full_date']==latest_day)
        df_date = df_date.withColumn("exploded_data", F.explode("response.results"))
        df_date = df_date.withColumn(
                    "price",
                    F.col("exploded_data").getItem('PriceInfo').getItem('Price') # either get by name or by index e.g. getItem(0) etc
                )

        res.append(df_date.sort(df_date.price.asc()).collect()[0]['price'])
    
df.unpersist()
spark.catalog.clearCache()
  • Related