Home > Blockchain >  how to avoid using for loop in spark (python)
how to avoid using for loop in spark (python)

Time:07-18

I'm new to pySpark, hope someone coul'd help me.

I have a dataframe with a bunch of filght searches results:

 ------ ----------- ---------- ---------- ----- 
|origin|destination|      from|        to|price|
 ------ ----------- ---------- ---------- ----- 
|   TLV|        NYC|2022-01-01|2022-01-05| 1000|
|   TLV|        ROM|2022-03-01|2022-04-05|  480|
|   TLV|        NYC|2022-01-02|2022-01-04|  990|
|   TLV|        NYC|2022-02-01|2022-03-15| 1200|
|   TLV|        NYC|2022-01-02|2022-01-05| 1100|
|   TLV|        BLR|2022-01-01|2022-01-05| 1480|
|   TLV|        NYC|2022-01-02|2022-01-05| 1010|
 ------ ----------- ---------- ---------- ----- 

I want to get all the flights prices from the dataframe, based on origin-destination and dates.

I have a list with some date combinations like so:

date_combinations = [("2022-01-01", "2022-01-02"), ("2022-01-01", "2022-01-03"), ("2022-01-01", "2022-01-04"),("2022-01-01", "2022-01-05"), ("2022-01-02", "2022-01-03"), ("2022-01-02", "2022-01-04"),("2022-01-02", "2022-01-05"), ("2022-01-03", "2022-01-04"), ("2022-01-03", "2022-01-05"), ("2022-01-04", "2022-01-05") ]

What I'm currently doing is filtering the dataframe inside a for loop for every date combination:

for date in date_combinations:
    df_date = df.filter((df['from']==date[0])&(df['to']==date[1]))
    if df_date.count()==0:
        results.append([date, 0])
    else:
        results.append([date, df_date.collect()[0]['price']])

Output:

[('2022-01-01', '2022-01-02'), 0]
[('2022-01-01', '2022-01-03'), 0]
[('2022-01-01', '2022-01-04'), 0]
[('2022-01-01', '2022-01-05'), 1000]
[('2022-01-02', '2022-01-03'), 0]
[('2022-01-02', '2022-01-04'), 990]
[('2022-01-02', '2022-01-05'), 1100]
[('2022-01-03', '2022-01-04'), 0]
[('2022-01-03', '2022-01-05'), 0]
[('2022-01-04', '2022-01-05'), 0]

The output is OK, but I'm sure there is a much more efficient way of doing it, instead of for loop (which in large datasets will take forever).

Thanks!

CodePudding user response:

You can create a df from your list of dates and join both dfs:

df = spark.createDataFrame(
    [
     ('TLV','NYC','2022-01-01','2022-01-05','1000')
    ,('TLV','ROM','2022-03-01','2022-04-05','480')
    ,('TLV','NYC','2022-01-02','2022-01-04','990')
    ,('TLV','NYC','2022-02-01','2022-03-15','1200')
    ,('TLV','NYC','2022-01-02','2022-01-05','1100')
    ,('TLV','BLR','2022-01-01','2022-01-05','1480')
    ,('TLV','NYC','2022-01-02','2022-01-05','1010')
    ],
    ['origin','destination','from','to','price']
)

df.show()

 ------ ----------- ---------- ---------- ----- 
|origin|destination|      from|        to|price|
 ------ ----------- ---------- ---------- ----- 
|   TLV|        NYC|2022-01-01|2022-01-05| 1000|
|   TLV|        ROM|2022-03-01|2022-04-05|  480|
|   TLV|        NYC|2022-01-02|2022-01-04|  990|
|   TLV|        NYC|2022-02-01|2022-03-15| 1200|
|   TLV|        NYC|2022-01-02|2022-01-05| 1100|
|   TLV|        BLR|2022-01-01|2022-01-05| 1480|
|   TLV|        NYC|2022-01-02|2022-01-05| 1010|
 ------ ----------- ---------- ---------- ----- 

date_combinations = [("2022-01-01", "2022-01-02"), ("2022-01-01", "2022-01-03"), ("2022-01-01", "2022-01-04"),("2022-01-01", "2022-01-05"), ("2022-01-02", "2022-01-03"), ("2022-01-02", "2022-01-04"),("2022-01-02", "2022-01-05"), ("2022-01-03", "2022-01-04"), ("2022-01-03", "2022-01-05"), ("2022-01-04", "2022-01-05") ]

df_date_combinations = spark.createDataFrame(date_combinations, ['from','to'])

df\
    .join(df_date_combinations, ['from','to'])\
    .show()

 ---------- ---------- ------ ----------- ----- 
|      from|        to|origin|destination|price|
 ---------- ---------- ------ ----------- ----- 
|2022-01-01|2022-01-05|   TLV|        NYC| 1000|
|2022-01-01|2022-01-05|   TLV|        BLR| 1480|
|2022-01-02|2022-01-04|   TLV|        NYC|  990|
|2022-01-02|2022-01-05|   TLV|        NYC| 1100|
|2022-01-02|2022-01-05|   TLV|        NYC| 1010|
 ---------- ---------- ------ ----------- ----- 

CodePudding user response:

I would first convert date_combinations to a DataFrame (using parallelize or selecting then dropping duplicates if that comes from a data set.

The idea is to do a left join between your dates and the data table (we will call it).

First, we want to clean your data table and drop duplicates, because you do not want that (because then the left join will also create duplicates on matching records):

val mainTableFiltered = data.select("from", "to", "price").dropDuplicates("from", "to")

We then join the dates with this cleaned table on from and to in a left join manner, so we do not lose records:

dateCombinations.join(mainTableFiltered, Seq("from", "to"), "left")

Then, the created records if not matched will be null, therefore we replace nulls with 0s:

.withColumn("price", when(col("price").isNull, 0).otherwise(col("price")))

And we finally order by from and to to get the same results (as in your example):

.orderBy("from", "to")

Full code:

val mainTableFiltered = data.select("from", "to", "price").dropDuplicates("from", "to")
dateCombinations.join(mainTableFiltered, Seq("from", "to"), "left")
  .withColumn("price", when(col("price").isNull, 0).otherwise(col("price")))
  .orderBy("from", "to")

Final output:

 ---------- ---------- ----- 
|      from|        to|price|
 ---------- ---------- ----- 
|2022-01-01|2022-01-02|    0|
|2022-01-01|2022-01-03|    0|
|2022-01-01|2022-01-04|    0|
|2022-01-01|2022-01-05| 1000|
|2022-01-02|2022-01-03|    0|
|2022-01-02|2022-01-04|  990|
|2022-01-02|2022-01-05| 1100|
|2022-01-03|2022-01-04|    0|
|2022-01-03|2022-01-05|    0|
|2022-01-04|2022-01-05|    0|
 ---------- ---------- ----- 
  • Related