I am writing a script for a daily incremental load process using Pyspark and a Hive table which has already been initially loaded with data. Each morning a job will run the script against that table.
I've been try to use PySpark to create a timestamp filter that will compare two timestamps, mod_date_ts
and max(mod_date_ts)
to show updated records that were added since the last load and save the result to the dataframe or another dataframe.
I've tried the following syntax:
dw_mod_ts = base_df.select('dw_mod_ts')
max_dw_load_ts = base_df.select(max_('dw_mod_ts'))
inc_df = inc_df.filter(dw_mod_ts >= max_dw_load_ts)
But I keep getting type errors and syntax errors stating that a dataframe cannot be compared to streven though I've casted both variables and columns as
TimestampType`.
inc_df = inc_df.filter(inc_df("dw_mod_ts").cast(DataTypes.DateType) >= max_('dw_mod_ts').cast(DataTypes.DateType))
Also, I keep getting an error stating the >=
operator cannot be used within the current syntax as well.
I don't have much experience working with Pyspark, so any help or suggestions is appreciated guys.
CodePudding user response:
Suppose the comparison is in string form. First build the max_dw_load_ts
variable, and then pass its value to filter
to get the final result.
max_dw_mod_ts = df.groupBy().agg(F.max('dw_mod_ts')).collect()[0][0]
df = df.filter(f'dw_mod_ts >= "{max_dw_mod_ts}"')
df.show()