I'm trying to figure out the number of working days between two dates. The table (dfDates) is laid out as follows:
Key | StartDateKey | EndDateKey |
---|---|---|
1 | 20171227 | 20180104 |
2 | 20171227 | 20171229 |
I have another table (dfDimDate) with all the relevant date keys and whether the date key is a working day or not:
DateKey | WorkDayFlag |
---|---|
20171227 | 1 |
20171228 | 1 |
20171229 | 1 |
20171230 | 0 |
20171231 | 0 |
20180101 | 0 |
20180102 | 1 |
20180103 | 1 |
20180104 | 1 |
I'm expecting a result as so:
Key | WorkingDays |
---|---|
1 | 6 |
2 | 3 |
So far (I realise this isn't complete to get me the above result), I've written this:
workingdays = []
for i in range(0, len(dfDates)):
value = dfDimDate.filter((dfDimDate.DateKey >= dfDates.collect()[i][1]) & (dfDimDate.DateKey <= df.collect()[i][2])).agg({'WorkDayFlag': 'sum'})
workingdays.append(value.collect())
However, only null values are being returned. Also, I've noticed this is very slow and took 54 seconds before it errored.
I think I understand what the error is about but I'm not sure how to fix it. Also, I'm not sure how to optimise the command so it runs faster. I'm looking for a solution in pyspark or spark SQL (whichever is easiest).
Many thanks,
Carolina
Edit: The error below was resolved thanks to a suggestion from @samkart who said to put the agg
after the filter
AnalysisException: Resolved attribute(s) DateKey#17075 missing from sum(WorkDayFlag)#22142L in operator !Filter ((DateKey#17075 <= 20171228) AND (DateKey#17075 >= 20171227)).;
CodePudding user response:
A possible and simple solution:
from pyspark.sql import functions as F
dfDates \
.join(dfDimDate, dfDimDate.DateKey.between(dfDates.StartDateKey, dfDates.EndDateKey)) \
.groupBy(dfDates.Key) \
.agg(F.sum(dfDimDate.WorkDayFlag).alias('WorkingDays'))
That is, first join the two datasets in order to link each date
with all the dimDate
rows in its range (dfDates.StartDateKey <= dfDimDate.DateKey <= dfDates.EndDateKey
).
Then simply group the joined dataset by the date key and count the number of working days in its range.
In the solution you proposed, you are performing the calculation directly on the driver, so you are not taking advantage of the parallelism that spark offers. This should be avoided when possible, especially for large datasets.
Apart from that, you are requesting repeated collect
s in the for-loop, even for the same data, resulting in a further slowdown.