Home > other >  Pyspark: How to compare two Timestamps to show most recently updated records in dataframe or table?
Pyspark: How to compare two Timestamps to show most recently updated records in dataframe or table?

Time:02-17

I am writing a script for a daily incremental load process using Pyspark and a Hive table which has already been initially loaded with data. Each morning a job will run the script against that table.

I've been try to use PySpark to create a timestamp filter that will compare two timestamps, mod_date_ts and max(mod_date_ts) to show updated records that were added since the last load and save the result to the dataframe or another dataframe.

I've tried the following syntax:

    dw_mod_ts = base_df.select('dw_mod_ts')
    max_dw_load_ts = base_df.select(max_('dw_mod_ts'))

    inc_df = inc_df.filter(dw_mod_ts >= max_dw_load_ts)

But I keep getting type errors and syntax errors stating that a dataframe cannot be compared to streven though I've casted both variables and columns asTimestampType`.

    inc_df = inc_df.filter(inc_df("dw_mod_ts").cast(DataTypes.DateType) >= max_('dw_mod_ts').cast(DataTypes.DateType))

Also, I keep getting an error stating the >= operator cannot be used within the current syntax as well.

I don't have much experience working with Pyspark, so any help or suggestions is appreciated guys.

CodePudding user response:

Suppose the comparison is in string form. First build the max_dw_load_ts variable, and then pass its value to filter to get the final result.

max_dw_mod_ts = df.groupBy().agg(F.max('dw_mod_ts')).collect()[0][0]
df = df.filter(f'dw_mod_ts >= "{max_dw_mod_ts}"')
df.show()
  • Related