Home > Enterprise >  Last unique entries above current row in Spark
Last unique entries above current row in Spark

Time:02-15

I have a Spark dataframe with the following data:

val df = sc.parallelize(Seq(
  (1, "A", "2022-01-01", 30, 0), 
  (1, "A", "2022-01-02", 20, 30),
  (1, "B", "2022-01-03", 50, 20),
  (1, "A", "2022-01-04", 10, 70),
  (1, "B", "2022-01-05", 30, 60), 
  (1, "A", "2022-01-06", 0,  40), 
  (1, "C", "2022-01-07", 100,30), 
  (2, "D", "2022-01-08", 5, 0)
)).toDF("id", "event", "eventTimestamp", "amount", "expected")

display(df)
id event eventTimestamp amount expected
1 "A" "2022-01-01" 30 0
1 "A" "2022-01-02" 20 30
1 "B" "2022-01-03" 50 20
1 "A" "2022-01-04" 10 70
1 "B" "2022-01-05" 30 60
1 "A" "2022-01-06" 0 40
1 "C" "2022-01-07" 100 30
2 "D" "2022-01-08" 5 0

I want to find the following for each row: The sum of all last entries (above the current row) for each id and each unique event. The desired outcome is in the column "expected".

E.g. for the order "C" I'd like to get the latest amounts for "A" and "B": 30 0 = 30

I tried the following query, however it would sum up the amounts of all previous orders, including duplications, (I'm not sure, if it's possible to apply a filter on the sum to take only distinct values):

val days = (x:Int) => x * 86400
val idWindow = Window.partitionBy("id").orderBy(col("eventTimestamp")
   .cast("timestamp").cast("long"))
   .rangeBetween(Window.unboundedPreceding, -days(1))

val res = df.withColumn("totalAmount", sum($"amount").over(idWindow))

Please note that the rangeBetween functionality is important for my use-case and should be preserved.

CodePudding user response:

The trick is to convert amounts to diffs within (id, event) pairs, which allows you to calculate moving sum in the next step. That moving sum maintains latest amounts of each unique event.

df
  .withColumn("diff", coalesce($"amount" - lag($"amount", 1).over(wIdEvent), $"amount")).
  .withColumn("sum", sum($"diff").over(wId)).
  .withColumn("final", coalesce(lag($"sum", 1).over(wId), lit(0))).
  .orderBy($"eventTimestamp").show

 --- ----- -------------- ------ -------- ---- --- ----- 
| id|event|eventTimestamp|amount|expected|diff|sum|final|
 --- ----- -------------- ------ -------- ---- --- ----- 
|  1|    A|    2022-01-01|    30|       0|  30| 30|    0|
|  1|    A|    2022-01-02|    20|      30| -10| 20|   30|
|  1|    B|    2022-01-03|    50|      20|  50| 70|   20|
|  1|    A|    2022-01-04|    10|      70| -10| 60|   70|
|  1|    B|    2022-01-05|    30|      60| -20| 40|   60|
|  1|    A|    2022-01-06|     0|      40| -10| 30|   40|
|  1|    C|    2022-01-07|   100|      30| 100|130|   30|
|  2|    D|    2022-01-08|     5|       0|   5|  5|    0|
 --- ----- -------------- ------ -------- ---- --- ----- 
  • Related