Home > Blockchain >  Divide window values by a reference row
Divide window values by a reference row

Time:12-27

I would like to have some guidance or help to address the following problem:

I have the following data in a Spark Data frame. I would like to create a window of n days preceding a succeeding a reference record and then calculate a division using reference values with the values in the window. However I have not figured out how to do this kind of operation, everything that I find is just mean, count or sum operations in the window.

Original data looks like this:

| symbol_id | date       | close    | is_reference |
|----------|------------|----------|--------------|
| XXXX     | 2000-01-19 | 809.9644 | FALSE        |
| XXXX     | 2000-01-20 | 784.274  | FALSE        |
| XXXX     | 2000-01-21 | 774.2831 | FALSE        |
| XXXX     | 2000-01-24 | 760.0106 | FALSE        |
| XXXX     | 2000-01-25 | 750.7335 | FALSE        |
| XXXX     | 2000-01-26 | 750.7335 | TRUE         |
| XXXX     | 2000-01-27 | 742.17   | FALSE        |
| XXXX     | 2000-01-28 | 749.3063 | FALSE        |
| XXXX     | 2000-01-31 | 750.02   | FALSE        |
| XXXX     | 2000-02-01 | 762.8653 | FALSE        |
| XXXX     | 2000-02-02 | 749.3063 | FALSE        |

Expected output looks like this:

| symbol_id | date       | close    | is_reference | reference_change  |
|----------|------------|----------|--------------|-------------------|
| XXXX     | 2000-01-19 | 809.9644 | FALSE        | 1.07889737170381  |
| XXXX     | 2000-01-20 | 784.274  | FALSE        | 1.04467697258748  |
| XXXX     | 2000-01-21 | 774.2831 | FALSE        | 1.03136878799201  |
| XXXX     | 2000-01-24 | 760.0106 | FALSE        | 1.0123573811479   |
| XXXX     | 2000-01-25 | 750.7335 | FALSE        | 1                 |
| XXXX     | 2000-01-26 | 750.7335 | TRUE         | 1                 |
| XXXX     | 2000-01-27 | 742.17   | FALSE        | 0.988593155893536 |
| XXXX     | 2000-01-28 | 749.3063 | FALSE        | 0.99809892591712  |
| XXXX     | 2000-01-31 | 750.02   | FALSE        | 0.999049596161621 |
| XXXX     | 2000-02-01 | 762.8653 | FALSE        | 1.01615992892285  |
| XXXX     | 2000-02-02 | 749.3063 | FALSE        | 0.99809892591712  |

I'm currently partition by symbol_id using the following snippet:

val window = Window.partitionBy(SYMBOL_ID)
    .orderBy(col(DATE).desc)
    .rowsBetween(5,0) // RangeBetween looks better but i just trying with rowsBetween for now

And trying to do something like this on reference_change column.

df
    .withColumn("close_movement", $"close"/lit(col("close")
    .where(col("is_reference") === true)).over(window)) // This command is wrong but its the most similar to thoughts in my mind.

So at the end I will be using the close WHERE is_reference = true divide by the close on the windows like the reference_change column we have on the expected output.

Thank you for your help!

CodePudding user response:

I would just use a simple join:

val ref = df.filter($"is_reference")
df.join(ref, df.col("symbol_id") === ref.col("symbol_id") &&
             abs(date_diff(df.col("date"), ref.col("date"))) <= 5)
  .select(df.col("symbol_id"), df.col("date"), df.col("close"), df.col("is_reference"),
          (df.col("close") / ref.col("close")).as("reference_change"))
  • Related