Home > Blockchain >  Pyspark Compare Dates From Two Dataframes If Statement
Pyspark Compare Dates From Two Dataframes If Statement

Time:02-04

Table 1 contains the history of all the employee information but only captures the data every 90 days. Table 2 contains the current information of all employees and is updated weekly with a timestamp. Table 1 gets appended by table two every 90 days. I figured by taking the timestamp in table 1 and adding 90 days to it, and comparing it to the time stamp in table 2, I could use the logic below to execute the append, but I'm getting an error... TypeError: '<' not supported between instances of 'DataFrame' and 'DataFrame' Am I missing something?

# Let's say the max date in table 1 is 2023-01-15. Adding 90 days would put us on 2023-04-15
futr_date = spark.sql('SELECT date_add(MAX(tm_update), 90) AS future_date FROM tbl_one')

# Checking the date in the weekly refresh table, i have a timestamp of 2023-02-03
curr_date = spark.sql('SELECT DISTINCT tm_update AS current_date FROM tbl_two')

if curr_date > futr_date:
  print('execute block of code that transforms table 2 data and append to table 1')
else:
  print('ignore and check again next week')

CodePudding user response:

Select statement is not returning value but dataframe and thats why you are getting error. If you want to get value you need to collect

futr_date  = spark.sql('SELECT date_add(MAX(tm_update), 90) AS future_date FROM tbl_one').collect()[0]

In second sql you are using distinct to get date, which may return list of values, i am not sure if thats what you want. Maybe here you should use MIN? With onlny one ts value it may be not important, but with more values is may cause some issues

As i said, i am not sure if your logic is correct, but here is working example which you can use for further changes

import time
import pyspark.sql.functions as F

historicalData = [
    (1, time.mktime(time.strptime("24/10/2022", "%d/%m/%Y"))),
    (2, time.mktime(time.strptime("15/01/2023", "%d/%m/%Y"))),
    (3, time.mktime(time.strptime("04/11/2022", "%d/%m/%Y"))),
]
currentData = [
    (1, time.mktime(time.strptime("01/02/2023", "%d/%m/%Y"))),
    (2, time.mktime(time.strptime("02/02/2023", "%d/%m/%Y"))),
    (3, time.mktime(time.strptime("03/02/2023", "%d/%m/%Y"))),
]

oldDf = spark.createDataFrame(historicalData, schema=["id", "tm_update"]).withColumn(
    "tm_update", F.to_timestamp("tm_update")
)
oldDf.createOrReplaceTempView("tbl_one")
currentDf = spark.createDataFrame(currentData, schema=["id", "tm_update"]).withColumn(
    "tm_update", F.to_timestamp("tm_update")
)
currentDf.createOrReplaceTempView("tbl_two")

futr_date = spark.sql(
    "SELECT date_add(MAX(tm_update), 90) AS future_date FROM tbl_one"
).collect()[0]
curr_date = spark.sql(
    "SELECT cast(MIN(tm_update) as date) AS current_date FROM tbl_two"
).collect()[0]

print(futr_date)
print(curr_date)

if curr_date > futr_date:
    print("execute block of code that transforms table 2 data and append to table 1")
else:
    print("ignore and check again next week")

Output

Row(future_date=datetime.date(2023, 4, 15))
Row(current_date=datetime.date(2023, 2, 3))
ignore and check again next week
  • Related