Home > Software design >  RowNumber with Reset
RowNumber with Reset

Time:09-27

I am trying to achieve the expected output shown here:

 --- ----- -------- -------- -------- ---- 
| ID|State|    Time|Expected|lagState|rank|
 --- ----- -------- -------- -------- ---- 
|  1|    P|20220722|       1|    null|   1|
|  1|    P|20220723|       2|       P|   2|
|  1|    P|20220724|       3|       P|   3|
|  1|    P|20220725|       4|       P|   4|
|  1|    D|20220726|       1|       P|   1|
|  1|    O|20220727|       1|       D|   1|
|  1|    D|20220728|       1|       O|   1|
|  1|    P|20220729|       2|       D|   1|
|  1|    P|20220730|       3|       P|   9|
|  1|    P|20220731|       4|       P|  10|
 --- ----- -------- -------- -------- ---- 
# create df
df = spark.createDataFrame(sc.parallelize([
    [1, 'P', 20220722, 1],
    [1, 'P', 20220723, 2],
    [1, 'P', 20220724, 3],
    [1, 'P', 20220725, 4],
    [1, 'D', 20220726, 1],
    [1, 'O', 20220727, 1],
    [1, 'D', 20220728, 1],
    [1, 'P', 20220729, 2],
    [1, 'P', 20220730, 3],
    [1, 'P', 20220731, 4],   
]),
                           ['ID', 'State', 'Time', 'Expected'])

# lag
df = df.withColumn('lagState', F.lag('State').over(w.partitionBy('id').orderBy('time')))

# rn
df = df.withColumn('rank', F.when( F.col('State') == F.col('lagState'), F.rank().over(w.partitionBy('id').orderBy('time', 'state'))).otherwise(1))


# view
df.show()

The general problem is that the tail of the DF is not resetting to the expected value as hoped.

CodePudding user response:

data_sdf. \
    withColumn('st_notsame', 
               func.coalesce(func.col('state') != func.lag('state').over(wd.partitionBy('id').orderBy('time')), 
                             func.lit(False)).cast('int')
               ). \
    withColumn('rank_temp', 
               func.sum('st_notsame').over(wd.partitionBy('id').orderBy('time').rowsBetween(-sys.maxsize, 0))
               ). \
    withColumn('rank', 
               func.row_number().over(wd.partitionBy('id', 'rank_temp').orderBy('time'))
               ). \
    show()

#  --- ----- -------- -------- ---------- --------- ---- 
# | id|state|    time|expected|st_notsame|rank_temp|rank|
#  --- ----- -------- -------- ---------- --------- ---- 
# |  1|    P|20220722|       1|         0|        0|   1|
# |  1|    P|20220723|       2|         0|        0|   2|
# |  1|    P|20220724|       3|         0|        0|   3|
# |  1|    P|20220725|       4|         0|        0|   4|
# |  1|    D|20220726|       1|         1|        1|   1|
# |  1|    O|20220727|       1|         1|        2|   1|
# |  1|    D|20220728|       1|         1|        3|   1|
# |  1|    P|20220729|       2|         1|        4|   1|
# |  1|    P|20220730|       3|         0|        4|   2|
# |  1|    P|20220731|       4|         0|        4|   3|
#  --- ----- -------- -------- ---------- --------- ---- 

your expected field looks a little incorrect. I believe the rank against "20220729" should be 1.

  • you first flag all the consecutive occurrences of the state as 0 and others as 1 - this'll enable you to do a running sum
  • use the sum window with infinite lookback for each id to get a temp rank
  • use the temp rank as a partition column to be used for row_number()
  • Related