Imagine I have a dataframe as follows:
date | timestamp | value |
---|---|---|
2022-01-05 | 2022-01-05 06:00:00 | -0.3 |
2022-01-04 | 2022-01-04 04:00:00 | -0.6 |
2022-01-03 | 2022-01-03 15:00:00 | -0.1 |
2022-01-03 | 2022-01-03 10:00:00 | -0.15 |
2022-01-02 | 2022-01-02 14:00:00 | -0.3 |
2022-01-02 | 2022-01-02 12:00:00 | -0.1 |
2022-01-01 | 2022-01-01 12:00:00 | -0.2 |
I want to create a column with the latest min value until the date of the timestamp
So the outcome would be:
date | timestamp | value | min_value_until_now |
---|---|---|---|
2022-01-05 | 2022-01-05 06:00:00 | -0.3 | -0.6 |
2022-01-04 | 2022-01-04 04:00:00 | -0.6 | -0.3 |
2022-01-03 | 2022-01-03 15:00:00 | -0.1 | -0.3 |
2022-01-03 | 2022-01-03 10:00:00 | -0.15 | -0.3 |
2022-01-02 | 2022-01-02 14:00:00 | -0.3 | -0.2 |
2022-01-02 | 2022-01-02 12:00:00 | -0.1 | -0.2 |
2022-01-01 | 2022-01-01 12:00:00 | -0.2 | -0.2 |
On 2022-01-01
there is not historical data and thus I can just substitute it by -0.2
which is the only point available at the beginning.
How can I do this? I tried with windowing but with no success.
Important to note is that the min_value_until_now
should decrease monotonically.
Any help would be duly appreciated.
CodePudding user response:
Use min
function over a Window:
from pyspark.sql import functions as F, Window
w = Window.orderBy("timestamp").rowsBetween(Window.unboundedPreceding, -1)
df.withColumn(
"min_value_until_now",
F.coalesce(F.min("value").over(w), F.col("value"))
).show()
# ---------- ------------------- ----- -------------------
#| date| timestamp|value|min_value_until_now|
# ---------- ------------------- ----- -------------------
#|2022-01-01|2022-01-01 12:00:00| -0.2| -0.2|
#|2022-01-02|2022-01-02 12:00:00| -0.1| -0.2|
#|2022-01-02|2022-01-02 14:00:00| -0.3| -0.2|
#|2022-01-03|2022-01-03 10:00:00|-0.15| -0.3|
#|2022-01-03|2022-01-03 15:00:00| -0.1| -0.3|
#|2022-01-04|2022-01-04 04:00:00| -0.6| -0.3|
#|2022-01-05|2022-01-05 06:00:00| -0.3| -0.6|
# ---------- ------------------- ----- -------------------
Note that using a non-partitioned Window could have bad performance effects. You should add a partitionBy
clause if you have a column ID
on which you can partition.