Home > Software design >  How to add hours as variable to timestamp in Pyspark
How to add hours as variable to timestamp in Pyspark

Time:08-22

Dataframe schema is like this: ["id", "t_create", "hours"] string, timestamp, int

Sample data is like: ["abc", "2022-07-01 12:23:21.343998", 5]

I want to add hours to the t_create and get a new column t_update: "2022-07-01 17:23:21.343998"

Here is my code:

df_cols = ["id", "t_create", "hour"]
df = spark.read.format("delta").load("blablah path")
df = df.withColumn("t_update", df.t_create   expr(f"INTERVAL 5 HOURS"))

It works no problem. However the hours column should be a variable. I did not figure out how to put the variable to the expr, f string and the INTERVAL function, something like:

df = df.withColumn("t_update", df.t_create   expr(f"INTERVAL {df.hours} HOURS"))
df = df.withColumn("t_update", df.t_create   expr(f"INTERVAL {col(df.hours)} HOURS"))

etc... They don't work. Need help here.

Another way is to write a udf and wrap the whole expr string to the udf return value:

@udf
def udf_interval(hours):
    return "INTERVAL "   str(hours)   " HOURS"

Then:

df = df.withColumn("t_update", df.t_create   expr(udf_interval(df.hours)))

Now I get TypeError: Column is not iterable.

Stuck. Need help in either the udf or non-udf way. Thanks!

CodePudding user response:

You can do this without using the fiddly unix_timestamp and utilise make_interval within SparkSQL

SparkSQL - TO_TIMESTAMP & MAKE_INTERVAL

sql.sql("""
WITH INP AS (
SELECT
    "abc" as id,
    TO_TIMESTAMP("2022-07-01 12:23:21.343998","yyyy-MM-dd HH:mm:ss.SSSSSS") as t_create,
    5 as t_hour
)

SELECT 
    id,
    t_create,
    t_hour,
    t_create   MAKE_INTERVAL(0,0,0,0,t_hour,0,0) HOURS as t_update
FROM INP 
""").show(truncate=False)

 --- -------------------------- ------ -------------------------- 
|id |t_create                  |t_hour|t_update                  |
 --- -------------------------- ------ -------------------------- 
|abc|2022-07-01 12:23:21.343998|5     |2022-07-01 17:23:21.343998|
 --- -------------------------- ------ -------------------------- 

Pyspark API

s = StringIO("""
id,t_create,t_hour
abc,2022-07-01 12:23:21.343998,5
"""
)

df = pd.read_csv(s,delimiter=',')

sparkDF = sql.createDataFrame(df)\
             .withColumn('t_create'
                            ,F.to_timestamp(F.col('t_create')                   
                                ,'yyyy-MM-dd HH:mm:ss.SSSSSS'
                            )
             ).withColumn('t_update'
                            ,F.expr('t_create   MAKE_INTERVAL(0,0,0,0,t_hour,0,0) HOURS')                   
             ).show(truncate=False)

 --- -------------------------- ------ -------------------------- 
|id |t_create                  |t_hour|t_update                  |
 --- -------------------------- ------ -------------------------- 
|abc|2022-07-01 12:23:21.343998|5     |2022-07-01 17:23:21.343998|
 --- -------------------------- ------ -------------------------- 

CodePudding user response:

A simple way would be to cast the timestamp to bigint (or decimal if dealing with fraction of second) and add the number of seconds to it. Here's an example where I've created columns for every calculation for detailed understanding - you can merge all the calculations into a single column.

spark.sparkContext.parallelize([("2022-07-01 12:23:21.343998",)]).toDF(['ts_str']). \
    withColumn('ts', func.col('ts_str').cast('timestamp')). \
    withColumn('hours_to_add', func.lit(5)). \
    withColumn('ts_as_decimal', func.col('ts').cast('decimal(20, 10)')). \
    withColumn('seconds_to_add_as_decimal', 
               func.col('hours_to_add').cast('decimal(20, 10)') * 3600
               ). \
    withColumn('new_ts_as_decimal', 
               func.col('ts_as_decimal')   func.col('seconds_to_add_as_decimal')
               ). \
    withColumn('new_ts', func.col('new_ts_as_decimal').cast('timestamp')). \
    show(truncate=False)

#  -------------------------- -------------------------- ------------ --------------------- ------------------------- --------------------- -------------------------- 
# |ts_str                    |ts                        |hours_to_add|ts_as_decimal        |seconds_to_add_as_decimal|new_ts_as_decimal    |new_ts                    |
#  -------------------------- -------------------------- ------------ --------------------- ------------------------- --------------------- -------------------------- 
# |2022-07-01 12:23:21.343998|2022-07-01 12:23:21.343998|5           |1656678201.3439980000|18000.0000000000         |1656696201.3439980000|2022-07-01 17:23:21.343998|
#  -------------------------- -------------------------- ------------ --------------------- ------------------------- --------------------- -------------------------- 
  • Related