Dataframe schema is like this: ["id", "t_create", "hours"] string, timestamp, int
Sample data is like: ["abc", "2022-07-01 12:23:21.343998", 5]
I want to add hours to the t_create and get a new column t_update: "2022-07-01 17:23:21.343998"
Here is my code:
df_cols = ["id", "t_create", "hour"]
df = spark.read.format("delta").load("blablah path")
df = df.withColumn("t_update", df.t_create expr(f"INTERVAL 5 HOURS"))
It works no problem. However the hours column should be a variable. I did not figure out how to put the variable to the expr, f string and the INTERVAL function, something like:
df = df.withColumn("t_update", df.t_create expr(f"INTERVAL {df.hours} HOURS"))
df = df.withColumn("t_update", df.t_create expr(f"INTERVAL {col(df.hours)} HOURS"))
etc... They don't work. Need help here.
Another way is to write a udf and wrap the whole expr string to the udf return value:
@udf
def udf_interval(hours):
return "INTERVAL " str(hours) " HOURS"
Then:
df = df.withColumn("t_update", df.t_create expr(udf_interval(df.hours)))
Now I get TypeError: Column is not iterable.
Stuck. Need help in either the udf or non-udf way. Thanks!
CodePudding user response:
You can do this without using the fiddly unix_timestamp
and utilise make_interval within SparkSQL
SparkSQL - TO_TIMESTAMP & MAKE_INTERVAL
sql.sql("""
WITH INP AS (
SELECT
"abc" as id,
TO_TIMESTAMP("2022-07-01 12:23:21.343998","yyyy-MM-dd HH:mm:ss.SSSSSS") as t_create,
5 as t_hour
)
SELECT
id,
t_create,
t_hour,
t_create MAKE_INTERVAL(0,0,0,0,t_hour,0,0) HOURS as t_update
FROM INP
""").show(truncate=False)
--- -------------------------- ------ --------------------------
|id |t_create |t_hour|t_update |
--- -------------------------- ------ --------------------------
|abc|2022-07-01 12:23:21.343998|5 |2022-07-01 17:23:21.343998|
--- -------------------------- ------ --------------------------
Pyspark API
s = StringIO("""
id,t_create,t_hour
abc,2022-07-01 12:23:21.343998,5
"""
)
df = pd.read_csv(s,delimiter=',')
sparkDF = sql.createDataFrame(df)\
.withColumn('t_create'
,F.to_timestamp(F.col('t_create')
,'yyyy-MM-dd HH:mm:ss.SSSSSS'
)
).withColumn('t_update'
,F.expr('t_create MAKE_INTERVAL(0,0,0,0,t_hour,0,0) HOURS')
).show(truncate=False)
--- -------------------------- ------ --------------------------
|id |t_create |t_hour|t_update |
--- -------------------------- ------ --------------------------
|abc|2022-07-01 12:23:21.343998|5 |2022-07-01 17:23:21.343998|
--- -------------------------- ------ --------------------------
CodePudding user response:
A simple way would be to cast the timestamp to bigint
(or decimal
if dealing with fraction of second) and add the number of seconds to it. Here's an example where I've created columns for every calculation for detailed understanding - you can merge all the calculations into a single column.
spark.sparkContext.parallelize([("2022-07-01 12:23:21.343998",)]).toDF(['ts_str']). \
withColumn('ts', func.col('ts_str').cast('timestamp')). \
withColumn('hours_to_add', func.lit(5)). \
withColumn('ts_as_decimal', func.col('ts').cast('decimal(20, 10)')). \
withColumn('seconds_to_add_as_decimal',
func.col('hours_to_add').cast('decimal(20, 10)') * 3600
). \
withColumn('new_ts_as_decimal',
func.col('ts_as_decimal') func.col('seconds_to_add_as_decimal')
). \
withColumn('new_ts', func.col('new_ts_as_decimal').cast('timestamp')). \
show(truncate=False)
# -------------------------- -------------------------- ------------ --------------------- ------------------------- --------------------- --------------------------
# |ts_str |ts |hours_to_add|ts_as_decimal |seconds_to_add_as_decimal|new_ts_as_decimal |new_ts |
# -------------------------- -------------------------- ------------ --------------------- ------------------------- --------------------- --------------------------
# |2022-07-01 12:23:21.343998|2022-07-01 12:23:21.343998|5 |1656678201.3439980000|18000.0000000000 |1656696201.3439980000|2022-07-01 17:23:21.343998|
# -------------------------- -------------------------- ------------ --------------------- ------------------------- --------------------- --------------------------