I have a dataframe and I'm trying to filter based on end_date
if it's >=
or <
a certain date.
However, I'm getting a "not callable" error.
line 148, in <module>
df_s1 = df_x.filter(df_x[\"end_date\"].ge(lit(\"2022-08-17\")))
TypeError: 'Column' object is not callable"
Here is my code:
df_x = df_x.join(df_di_meet, trim(df_x.application_id) == trim(df_di_meet.application_id), "left")\
.select (df_x["*"], df_di_meet["end_date"])
# ... Cast end_date to timestamp ...end_date format looks like 2013-12-20 23:59:00.0000000
df_x = df_x.withColumn("end_date",(col("end_date").cast("timestamp")))
# ... Here df_s1 >= 2022-08-17
df_s1 = df_x.filter(df_x["end_date"].ge(lit("2022-08-17")))
#... Here df_s2 < 2022-08-17
df_s2 = df_x.filter(df_x["end_date"].lt(lit("2022-08-17")))
What I'm trying to do is check additional logic as well like the code below, but since it's not working with a when
clause I decided to break down the dataframes and check each one separately. Is there an easier way, or how could I get the below code to work?
df_x = df_x.withColumn("REV_STAT_TYP_DES", when((df_x.review_statmnt_type_desc == lit("")) & (df_x("end_date").ge(lit("2022-08-17"))), "Not Released")
when((df_x.review_statmnt_type_desc == lit("")) & ((df_x("end_date").lt(lit("2022-08-17"))) | (df_x.end_date == lit(""))), "Not Available")
.otherwise(None))
CodePudding user response:
df_x("end_date") --> This is wrong way of accessing a spark dataframe column. That's why python is assuming it as a callable and you are getting that error.
df_x["end_date"] --> This is how you should access the column (or df_x.end_date)
UPDATE:
Now only noticed , .ge() or .le() kind of methods won't work with spark dataframe column objects. You can use any of the below ways of filtering:
from pyspark.sql.functions import col
df_s1 = df_x.filter(df_x["end_date"] >='2022-08-17')
# OR
df_s1 = df_x.filter(df_x.end_date>='2022-08-17')
# OR
df_s1 = df_x.filter(col('end_date')>='2022-08-17')
# OR
df_s1 = df_x.filter("end_date>='2022-08-17'")
# OR
# you can use df_x.where() instead of df_x.filter
CodePudding user response:
You probably got confused between pandas and pyspark. Anyway this is how you do it
DataFrame
df=spark.createDataFrame([("2022-08-16",1),("2019-06-24",2),("2022-08-19",3)]).toDF("date","increment")#
Pyspark
df_x = df.withColumn('date', to_date('date'))
df_x.filter(col('date')>(to_date(lit("2022-08-17")))).show()
Pandas
df_x = df.toPandas()
df_s1 = df_x.assign(date= pd.to_datetime(df_x['date'])).query("date.gt('2022-08-17')", engine='python')
or
df_x[df_x['date']>'2022-08-17']
CodePudding user response:
Use SQL style free form case/when syntax in the expr() function. That way it is portable also.
df_x = (df_x.withColumn("REV_STAT_TYP_DES",
expr(""" case
when review_statmnt_type_desc='' and end_date >='2022-08-17' then 'Not Released'
when review_statmnt_type_desc='' and ( end_date <'2022-08-17' or end_date is null ) then 'Not Available'
else null
end
""")
CodePudding user response:
There are attempts to make difficult code look cleaner. According to those recommendations, conditional statements may be better understood and maintained if they were separated into different variables. Look at how I've added isnull
to some of the variables - it would have been a lot more difficult if they were not refactored into separate variables.
from pyspark.sql import functions as F
no_review = (F.col("review_statmnt_type_desc") == "") | F.isnull("review_statmnt_type_desc")
no_end_date = (F.col("end_date") == "") | F.isnull("end_date")
not_released = no_review & (F.col("end_date") >= F.lit("2022-08-17"))
not_available = no_review & ((F.col("end_date") < F.lit("2022-08-17")) | no_end_date)
Also, you don't need the otherwise
clause if it returns null
(its the default behaviour).
df_x = df_x.withColumn(
"REV_STAT_TYP_DES",
F.when(not_released, "Not Released")
.when(not_available, "Not Available")
)