Home > database >  Spark - Generate window with specific keys
Spark - Generate window with specific keys

Time:10-11

I would like to add a column to my df_ordored that identifies windows based on the status column. I want to generate an id for all logs between "open" and "close", as follows:

df_ordored = 
 ---- --------- ------ 
|date|word     |status|
 ---- --------- ------ 
|1   |un       |      |
|2   |itnane   |open  |
|3   |tres     |      |
|4   |four     |close |
|4.1 |four     |other |
|5   |fünf     |open  |
|6   |Liù      |null  |
|7   |Sette    |any   |
|8   |vosem    |      |
|9   |Shinchaku|close |
 ---- --------- ------ 
df_expected =
 ---- --------- ------ -- 
|date|word     |status|id|
 ---- --------- ------ -- 
|1   |un       |      |  |
|2   |itnane   |open  |a |
|3   |tres     |      |a |
|4   |four     |close |a |
|4.1 |four     |other |  |
|5   |fünf     |open  |b |
|6   |Liù      |null  |b |
|7   |Sette    |any   |b |
|8   |vosem    |      |b |
|9   |Shinchaku|close |b |
 ---- --------- ------ -- 

Is it possible to do this in dataframe/dataset abstraction and without collecting data on driver ?

CodePudding user response:

You can use a window function to generate incremental integers as IDs of open/close sequences:

# Window to compute cumulative sums
cumsum_window = (
    Window
    # .partitionBy('something')  # if you can use a column to partition the data, is a good idea to use it to improve performance in the case of DataFrames with a lot of data/rows
    .orderBy('date')
    .rangeBetween(Window.unboundedPreceding, 0)
)

# find row with status open
df = df.withColumn('is_open', F.when(F.col('status') == 'open', 1).otherwise(0))

# mark both open and close, with 1 and -1 respectively
df = df.withColumn('is_open_close', F.when(F.col('status') == 'close', -1).otherwise(F.col('is_open')))

# A sequence is composed by rows between open and close (included).
# So, row belongs to a sequence if one of the following holds:
#    1. have close status or 1 in the cumulative sum 
#    2. its cumulative sum of the column is_open_close is 1
#    IS_OPEN_CLOSE: 0 0 0 ... 1 (open) 0 0 ... 0 -1 (close) 0 1 (open) ...
#    CUMSUM:        0 0 0 ... 1 (open) 1 1 ... 1  0 (close) 0 1 (open) ...
df = df.withColumn('is_in_sequence', (F.col('status') == 'close') | (F.sum('is_open_close').over(cumsum_window).cast(T.BooleanType())))

# Compute an id for rows in a sequence as the ordinal of their correspondent
# open in the is_open column. Use the cumulative sum of is_open to compute it.
#    IS_OPEN: 0 0 0 ... 1 (open) 0 0 ... 0  0 (close) 0 1 (open) ...
#    CUMSUM:  0 0 0 ... 1 (open) 1 1 ... 1  1 (close) 1 2 (open) ...
# Assign the just created ID only to rows belonging to a sequence
df = df.withColumn('sequence_id', F.when(F.col('is_in_sequence'), F.sum('is_open').over(cumsum_window)))

# remove temporary columns
df = df.drop('is_open')
df = df.drop('is_open_close')
df = df.drop('is_in_sequence')

This is what you get:

 ---- --------- ------ ----------- 
|date|     word|status|sequence_id|
 ---- --------- ------ ----------- 
|   1|       un|  null|       null|
|   2|   itnane|  open|          1|
|   3|     tres|  null|          1|
|   4|     four| close|          1|
| 4.1|     four| other|       null|
|   5|     fünf|  open|          2|
|   6|      Liù|  null|          2|
|   7|    Sette|   any|          2|
|   8|    vosem|  null|          2|
|   9|Shinchaku| close|          2|
 ---- --------- ------ ----------- 

If you are using a partition column in the window, obviously the ID of a sequence will be the pair composed by the sequence_id and that column. You can easily translate it into an overall unique ID by combining them. For instance, suppose that your DataFrame is the following one:

 ------------------- --------- ------ 
|               date|     word|status|
 ------------------- --------- ------ 
|2022-01-10 12:00:00|       un|  null|
|2022-01-10 13:00:00|   itnane|  open|
|2022-01-10 14:00:00|     tres|  null|
|2022-01-10 15:00:00|     four| close|
|2022-01-10 16:00:00|     four| other|
|2022-01-10 17:00:00|     fünf|  open|
|2022-01-10 18:00:00|      Liù|  null|
|2022-01-10 18:00:00|    Sette|   any|
|2022-01-10 20:00:00|    vosem|  null|
|2022-01-10 21:00:00|Shinchaku| close|
|2022-01-13 09:00:00|       ve|  null|
|2022-01-13 10:00:00|      col|  open|
|2022-01-13 11:00:00|     bias|  null|
|2022-01-13 12:00:00|       no| close|
 ------------------- --------- ------ 

and you want to partition by day, then you can use the following window:

cumsum_window = (
    Window
    .partitionBy(F.date_trunc('day', 'date'))
    .orderBy('date')
    .rangeBetween(Window.unboundedPreceding, 0)
)

and add the following piece of code as final step:

df = df.withColumn(
    'unique_sequence_id',
    F.when(
        F.col('sequence_id').isNotNull(),
        F.concat_ws('_', F.date_format('date', 'yyyy-MM-dd'), 'sequence_id')
    )
)

This is the result:

 ------------------- --------- ------ ----------- ------------------ 
|               date|     word|status|sequence_id|unique_sequence_id|
 ------------------- --------- ------ ----------- ------------------ 
|2022-01-10 12:00:00|       un|  null|       null|              null|
|2022-01-10 13:00:00|   itnane|  open|          1|      2022-01-10_1|
|2022-01-10 14:00:00|     tres|  null|          1|      2022-01-10_1|
|2022-01-10 15:00:00|     four| close|          1|      2022-01-10_1|
|2022-01-10 16:00:00|     four| other|       null|              null|
|2022-01-10 17:00:00|     fünf|  open|          2|      2022-01-10_2|
|2022-01-10 18:00:00|      Liù|  null|          2|      2022-01-10_2|
|2022-01-10 18:00:00|    Sette|   any|          2|      2022-01-10_2|
|2022-01-10 20:00:00|    vosem|  null|          2|      2022-01-10_2|
|2022-01-10 21:00:00|Shinchaku| close|          2|      2022-01-10_2|
|2022-01-13 09:00:00|       ve|  null|       null|              null|
|2022-01-13 10:00:00|      col|  open|          1|      2022-01-13_1|
|2022-01-13 11:00:00|     bias|  null|          1|      2022-01-13_1|
|2022-01-13 12:00:00|       no| close|          1|      2022-01-13_1|
 ------------------- --------- ------ ----------- ------------------ 
  • Related