I'm new to pySpark, hope someone coul'd help me.
I have a dataframe with a bunch of filght searches results:
------ ----------- ---------- ---------- -----
|origin|destination| from| to|price|
------ ----------- ---------- ---------- -----
| TLV| NYC|2022-01-01|2022-01-05| 1000|
| TLV| ROM|2022-03-01|2022-04-05| 480|
| TLV| NYC|2022-01-02|2022-01-04| 990|
| TLV| NYC|2022-02-01|2022-03-15| 1200|
| TLV| NYC|2022-01-02|2022-01-05| 1100|
| TLV| BLR|2022-01-01|2022-01-05| 1480|
| TLV| NYC|2022-01-02|2022-01-05| 1010|
------ ----------- ---------- ---------- -----
I want to get all the flights prices from the dataframe, based on origin-destination and dates.
I have a list with some date combinations like so:
date_combinations = [("2022-01-01", "2022-01-02"), ("2022-01-01", "2022-01-03"), ("2022-01-01", "2022-01-04"),("2022-01-01", "2022-01-05"), ("2022-01-02", "2022-01-03"), ("2022-01-02", "2022-01-04"),("2022-01-02", "2022-01-05"), ("2022-01-03", "2022-01-04"), ("2022-01-03", "2022-01-05"), ("2022-01-04", "2022-01-05") ]
What I'm currently doing is filtering the dataframe inside a for loop for every date combination:
for date in date_combinations:
df_date = df.filter((df['from']==date[0])&(df['to']==date[1]))
if df_date.count()==0:
results.append([date, 0])
else:
results.append([date, df_date.collect()[0]['price']])
Output:
[('2022-01-01', '2022-01-02'), 0]
[('2022-01-01', '2022-01-03'), 0]
[('2022-01-01', '2022-01-04'), 0]
[('2022-01-01', '2022-01-05'), 1000]
[('2022-01-02', '2022-01-03'), 0]
[('2022-01-02', '2022-01-04'), 990]
[('2022-01-02', '2022-01-05'), 1100]
[('2022-01-03', '2022-01-04'), 0]
[('2022-01-03', '2022-01-05'), 0]
[('2022-01-04', '2022-01-05'), 0]
The output is OK, but I'm sure there is a much more efficient way of doing it, instead of for loop (which in large datasets will take forever).
Thanks!
CodePudding user response:
You can create a df from your list of dates and join both dfs:
df = spark.createDataFrame(
[
('TLV','NYC','2022-01-01','2022-01-05','1000')
,('TLV','ROM','2022-03-01','2022-04-05','480')
,('TLV','NYC','2022-01-02','2022-01-04','990')
,('TLV','NYC','2022-02-01','2022-03-15','1200')
,('TLV','NYC','2022-01-02','2022-01-05','1100')
,('TLV','BLR','2022-01-01','2022-01-05','1480')
,('TLV','NYC','2022-01-02','2022-01-05','1010')
],
['origin','destination','from','to','price']
)
df.show()
------ ----------- ---------- ---------- -----
|origin|destination| from| to|price|
------ ----------- ---------- ---------- -----
| TLV| NYC|2022-01-01|2022-01-05| 1000|
| TLV| ROM|2022-03-01|2022-04-05| 480|
| TLV| NYC|2022-01-02|2022-01-04| 990|
| TLV| NYC|2022-02-01|2022-03-15| 1200|
| TLV| NYC|2022-01-02|2022-01-05| 1100|
| TLV| BLR|2022-01-01|2022-01-05| 1480|
| TLV| NYC|2022-01-02|2022-01-05| 1010|
------ ----------- ---------- ---------- -----
date_combinations = [("2022-01-01", "2022-01-02"), ("2022-01-01", "2022-01-03"), ("2022-01-01", "2022-01-04"),("2022-01-01", "2022-01-05"), ("2022-01-02", "2022-01-03"), ("2022-01-02", "2022-01-04"),("2022-01-02", "2022-01-05"), ("2022-01-03", "2022-01-04"), ("2022-01-03", "2022-01-05"), ("2022-01-04", "2022-01-05") ]
df_date_combinations = spark.createDataFrame(date_combinations, ['from','to'])
df\
.join(df_date_combinations, ['from','to'])\
.show()
---------- ---------- ------ ----------- -----
| from| to|origin|destination|price|
---------- ---------- ------ ----------- -----
|2022-01-01|2022-01-05| TLV| NYC| 1000|
|2022-01-01|2022-01-05| TLV| BLR| 1480|
|2022-01-02|2022-01-04| TLV| NYC| 990|
|2022-01-02|2022-01-05| TLV| NYC| 1100|
|2022-01-02|2022-01-05| TLV| NYC| 1010|
---------- ---------- ------ ----------- -----
CodePudding user response:
I would first convert date_combinations
to a DataFrame (using parallelize or selecting then dropping duplicates if that comes from a data set.
The idea is to do a left join between your dates and the data
table (we will call it).
First, we want to clean your data
table and drop duplicates, because you do not want that (because then the left join will also create duplicates on matching records):
val mainTableFiltered = data.select("from", "to", "price").dropDuplicates("from", "to")
We then join the dates with this cleaned table on from
and to
in a left
join manner, so we do not lose records:
dateCombinations.join(mainTableFiltered, Seq("from", "to"), "left")
Then, the created records if not matched will be null, therefore we replace nulls with 0s:
.withColumn("price", when(col("price").isNull, 0).otherwise(col("price")))
And we finally order by from
and to
to get the same results (as in your example):
.orderBy("from", "to")
Full code:
val mainTableFiltered = data.select("from", "to", "price").dropDuplicates("from", "to")
dateCombinations.join(mainTableFiltered, Seq("from", "to"), "left")
.withColumn("price", when(col("price").isNull, 0).otherwise(col("price")))
.orderBy("from", "to")
Final output:
---------- ---------- -----
| from| to|price|
---------- ---------- -----
|2022-01-01|2022-01-02| 0|
|2022-01-01|2022-01-03| 0|
|2022-01-01|2022-01-04| 0|
|2022-01-01|2022-01-05| 1000|
|2022-01-02|2022-01-03| 0|
|2022-01-02|2022-01-04| 990|
|2022-01-02|2022-01-05| 1100|
|2022-01-03|2022-01-04| 0|
|2022-01-03|2022-01-05| 0|
|2022-01-04|2022-01-05| 0|
---------- ---------- -----