I have a dataset that contains all the dates between 2010 and 2040 under this format
1/1/2010
1/2/2010
1/3/2010
...
...
...
12/31/2040
I am using Spark to transform data where I'm trying to apply a filter that only keeps dates that are [today - 2 years, open in the future]
I literally tried using all the date manipulation functions Spark offers including
df_calendar.filter(datediff(to_date(col("date"),"m/d/yyyy"),current_date()).gt(-730))
df_calendar.select("*").withColumn("datediff",datediff(to_date(col("date"),"m/d/yyyy"),current_date())).filter(col("datediff")>(-730))
val today = df_calendar.select(date_sub(current_date(),730))
df_calendar.filter((to_date(col("date"),"m/d/yyyy") > today ))
But I always end up with the same result, the dataset return all the values starting 1/1/2021, as it goes back "2 years" but not in terms of days. Notice that I also tried using the year() function and it also returns the same result, I'm seriously confused of the result that returns each time, I really need your help for this one.
CodePudding user response:
Not quite sure why your code doesn't work.
>>> from datetime import date, timedelta
>>> import pyspark.sql.functions as F
>>>
>>> seed_date = date(2010, 1, 1)
>>> two_years_back = date.today() - timedelta(days=2*365)
>>>
>>> df1 = spark.createDataFrame(data=[[ [seed_date timedelta(days=i) for i in range(0, 11323)] ]])
>>> df_calendar = df1.withColumn('date', F.explode(df1['_1'])).drop('_1')
>>>
>>> calendar_list = df_calendar.collect()
>>> calendar_list[0]
Row(date=datetime.date(2010, 1, 1))
>>> calendar_list[-1]
Row(date=datetime.date(2040, 12, 31))
>>>
>>> df3 = df_calendar.filter((F.col('date') > two_years_back) \
... & (F.col('date') < date.today())
>>> filtered_list = filtered_df.collect()
>>>
>>> filtered_list[0]
Row(date=datetime.date(2020, 2, 26))
>>> filtered_list[-1]
Row(date=datetime.date(2022, 2, 23))
>>>
EDIT: So I guess the problem with your code probably is that you're using wrong pattern.
>>>
>>> from datetime import date, timedelta
>>> import pyspark.sql.functions as F
>>>
>>> seed_date = date(2010, 1, 1)
>>> two_years_back = date.today() - timedelta(days=2*365)
>>>
>>> df1 = spark.createDataFrame(data=[[ [(seed_date timedelta(days=i)).strftime('%-m/%-d/%Y') for i in range(0, 11323)] ]])
>>> df_calendar = df1.withColumn('date', F.explode(df1['_1'])).drop('_1')
>>>
>>> calendar_list = df_calendar.collect()
>>> calendar_list[0]
Row(date='1/1/2010')
>>> calendar_list[-1]
Row(date='12/31/2040')
>>>
>>> filtered_df = df_calendar.filter((F.to_date(F.col("date"), 'M/d/yyyy') < date.today()) \
... & (F.to_date(F.col("date"), 'M/d/yyyy') > two_years_back))
>>> filtered_list = filtered_df.collect()
>>>
>>> filtered_list[0]
Row(date='2/26/2020')
>>> filtered_list[-1]
Row(date='2/23/2022')
>>>