I would like to have some guidance or help to address the following problem:
I have the following data in a Spark Data frame. I would like to create a window of n days preceding a succeeding a reference record and then calculate a division using reference values with the values in the window. However I have not figured out how to do this kind of operation, everything that I find is just mean, count or sum operations in the window.
Original data looks like this:
| symbol_id | date | close | is_reference |
|----------|------------|----------|--------------|
| XXXX | 2000-01-19 | 809.9644 | FALSE |
| XXXX | 2000-01-20 | 784.274 | FALSE |
| XXXX | 2000-01-21 | 774.2831 | FALSE |
| XXXX | 2000-01-24 | 760.0106 | FALSE |
| XXXX | 2000-01-25 | 750.7335 | FALSE |
| XXXX | 2000-01-26 | 750.7335 | TRUE |
| XXXX | 2000-01-27 | 742.17 | FALSE |
| XXXX | 2000-01-28 | 749.3063 | FALSE |
| XXXX | 2000-01-31 | 750.02 | FALSE |
| XXXX | 2000-02-01 | 762.8653 | FALSE |
| XXXX | 2000-02-02 | 749.3063 | FALSE |
Expected output looks like this:
| symbol_id | date | close | is_reference | reference_change |
|----------|------------|----------|--------------|-------------------|
| XXXX | 2000-01-19 | 809.9644 | FALSE | 1.07889737170381 |
| XXXX | 2000-01-20 | 784.274 | FALSE | 1.04467697258748 |
| XXXX | 2000-01-21 | 774.2831 | FALSE | 1.03136878799201 |
| XXXX | 2000-01-24 | 760.0106 | FALSE | 1.0123573811479 |
| XXXX | 2000-01-25 | 750.7335 | FALSE | 1 |
| XXXX | 2000-01-26 | 750.7335 | TRUE | 1 |
| XXXX | 2000-01-27 | 742.17 | FALSE | 0.988593155893536 |
| XXXX | 2000-01-28 | 749.3063 | FALSE | 0.99809892591712 |
| XXXX | 2000-01-31 | 750.02 | FALSE | 0.999049596161621 |
| XXXX | 2000-02-01 | 762.8653 | FALSE | 1.01615992892285 |
| XXXX | 2000-02-02 | 749.3063 | FALSE | 0.99809892591712 |
I'm currently partition by symbol_id using the following snippet:
val window = Window.partitionBy(SYMBOL_ID)
.orderBy(col(DATE).desc)
.rowsBetween(5,0) // RangeBetween looks better but i just trying with rowsBetween for now
And trying to do something like this on reference_change
column.
df
.withColumn("close_movement", $"close"/lit(col("close")
.where(col("is_reference") === true)).over(window)) // This command is wrong but its the most similar to thoughts in my mind.
So at the end I will be using the close
WHERE
is_reference = true
divide by the close
on the windows like the reference_change
column we have on the expected output.
Thank you for your help!
CodePudding user response:
I would just use a simple join:
val ref = df.filter($"is_reference")
df.join(ref, df.col("symbol_id") === ref.col("symbol_id") &&
abs(date_diff(df.col("date"), ref.col("date"))) <= 5)
.select(df.col("symbol_id"), df.col("date"), df.col("close"), df.col("is_reference"),
(df.col("close") / ref.col("close")).as("reference_change"))