Home > Enterprise >  How to filter variable date In spark?
How to filter variable date In spark?

Time:02-27

I have a dataset that contains all the dates between 2010 and 2040 under this format

1/1/2010 
1/2/2010
1/3/2010
...
...
...
12/31/2040

I am using Spark to transform data where I'm trying to apply a filter that only keeps dates that are [today - 2 years, open in the future]

I literally tried using all the date manipulation functions Spark offers including df_calendar.filter(datediff(to_date(col("date"),"m/d/yyyy"),current_date()).gt(-730))

df_calendar.select("*").withColumn("datediff",datediff(to_date(col("date"),"m/d/yyyy"),current_date())).filter(col("datediff")>(-730))

val today = df_calendar.select(date_sub(current_date(),730))
df_calendar.filter((to_date(col("date"),"m/d/yyyy") > today ))

But I always end up with the same result, the dataset return all the values starting 1/1/2021, as it goes back "2 years" but not in terms of days. Notice that I also tried using the year() function and it also returns the same result, I'm seriously confused of the result that returns each time, I really need your help for this one.

CodePudding user response:

Not quite sure why your code doesn't work.

>>> from datetime import date, timedelta
>>> import pyspark.sql.functions as F
>>> 
>>> seed_date = date(2010, 1, 1)
>>> two_years_back = date.today() - timedelta(days=2*365)
>>> 
>>> df1 = spark.createDataFrame(data=[[ [seed_date   timedelta(days=i) for i in range(0, 11323)] ]])
>>> df_calendar = df1.withColumn('date', F.explode(df1['_1'])).drop('_1')
>>> 
>>> calendar_list = df_calendar.collect()
>>> calendar_list[0]
Row(date=datetime.date(2010, 1, 1))
>>> calendar_list[-1]
Row(date=datetime.date(2040, 12, 31))
>>> 
>>> df3 = df_calendar.filter((F.col('date') > two_years_back) \
...                           & (F.col('date') < date.today())
>>> filtered_list = filtered_df.collect()
>>>                                                                             
>>> filtered_list[0]
Row(date=datetime.date(2020, 2, 26))
>>> filtered_list[-1]
Row(date=datetime.date(2022, 2, 23))
>>> 

EDIT: So I guess the problem with your code probably is that you're using wrong pattern.

>>> 
>>> from datetime import date, timedelta
>>> import pyspark.sql.functions as F
>>> 
>>> seed_date = date(2010, 1, 1)
>>> two_years_back = date.today() - timedelta(days=2*365)
>>> 
>>> df1 = spark.createDataFrame(data=[[ [(seed_date   timedelta(days=i)).strftime('%-m/%-d/%Y') for i in range(0, 11323)] ]])
>>> df_calendar = df1.withColumn('date', F.explode(df1['_1'])).drop('_1')
>>> 
>>> calendar_list = df_calendar.collect()
>>> calendar_list[0]
Row(date='1/1/2010')
>>> calendar_list[-1]
Row(date='12/31/2040')
>>> 
>>> filtered_df = df_calendar.filter((F.to_date(F.col("date"), 'M/d/yyyy') < date.today()) \
...                                   & (F.to_date(F.col("date"), 'M/d/yyyy') > two_years_back))
>>> filtered_list = filtered_df.collect()
>>> 
>>> filtered_list[0]
Row(date='2/26/2020')
>>> filtered_list[-1]
Row(date='2/23/2022')
>>> 
  • Related