I would like to add a column to my df_ordored
that identifies windows based on the status column. I want to generate an id for all logs between "open" and "close", as follows:
df_ordored =
---- --------- ------
|date|word |status|
---- --------- ------
|1 |un | |
|2 |itnane |open |
|3 |tres | |
|4 |four |close |
|4.1 |four |other |
|5 |fünf |open |
|6 |Liù |null |
|7 |Sette |any |
|8 |vosem | |
|9 |Shinchaku|close |
---- --------- ------
df_expected =
---- --------- ------ --
|date|word |status|id|
---- --------- ------ --
|1 |un | | |
|2 |itnane |open |a |
|3 |tres | |a |
|4 |four |close |a |
|4.1 |four |other | |
|5 |fünf |open |b |
|6 |Liù |null |b |
|7 |Sette |any |b |
|8 |vosem | |b |
|9 |Shinchaku|close |b |
---- --------- ------ --
Is it possible to do this in dataframe/dataset abstraction and without collecting data on driver ?
CodePudding user response:
You can use a window function to generate incremental integers as IDs of open/close sequences:
# Window to compute cumulative sums
cumsum_window = (
Window
# .partitionBy('something') # if you can use a column to partition the data, is a good idea to use it to improve performance in the case of DataFrames with a lot of data/rows
.orderBy('date')
.rangeBetween(Window.unboundedPreceding, 0)
)
# find row with status open
df = df.withColumn('is_open', F.when(F.col('status') == 'open', 1).otherwise(0))
# mark both open and close, with 1 and -1 respectively
df = df.withColumn('is_open_close', F.when(F.col('status') == 'close', -1).otherwise(F.col('is_open')))
# A sequence is composed by rows between open and close (included).
# So, row belongs to a sequence if one of the following holds:
# 1. have close status or 1 in the cumulative sum
# 2. its cumulative sum of the column is_open_close is 1
# IS_OPEN_CLOSE: 0 0 0 ... 1 (open) 0 0 ... 0 -1 (close) 0 1 (open) ...
# CUMSUM: 0 0 0 ... 1 (open) 1 1 ... 1 0 (close) 0 1 (open) ...
df = df.withColumn('is_in_sequence', (F.col('status') == 'close') | (F.sum('is_open_close').over(cumsum_window).cast(T.BooleanType())))
# Compute an id for rows in a sequence as the ordinal of their correspondent
# open in the is_open column. Use the cumulative sum of is_open to compute it.
# IS_OPEN: 0 0 0 ... 1 (open) 0 0 ... 0 0 (close) 0 1 (open) ...
# CUMSUM: 0 0 0 ... 1 (open) 1 1 ... 1 1 (close) 1 2 (open) ...
# Assign the just created ID only to rows belonging to a sequence
df = df.withColumn('sequence_id', F.when(F.col('is_in_sequence'), F.sum('is_open').over(cumsum_window)))
# remove temporary columns
df = df.drop('is_open')
df = df.drop('is_open_close')
df = df.drop('is_in_sequence')
This is what you get:
---- --------- ------ -----------
|date| word|status|sequence_id|
---- --------- ------ -----------
| 1| un| null| null|
| 2| itnane| open| 1|
| 3| tres| null| 1|
| 4| four| close| 1|
| 4.1| four| other| null|
| 5| fünf| open| 2|
| 6| Liù| null| 2|
| 7| Sette| any| 2|
| 8| vosem| null| 2|
| 9|Shinchaku| close| 2|
---- --------- ------ -----------
If you are using a partition column in the window, obviously the ID of a sequence will be the pair composed by the sequence_id
and that column. You can easily translate it into an overall unique ID by combining them. For instance, suppose that your DataFrame is the following one:
------------------- --------- ------
| date| word|status|
------------------- --------- ------
|2022-01-10 12:00:00| un| null|
|2022-01-10 13:00:00| itnane| open|
|2022-01-10 14:00:00| tres| null|
|2022-01-10 15:00:00| four| close|
|2022-01-10 16:00:00| four| other|
|2022-01-10 17:00:00| fünf| open|
|2022-01-10 18:00:00| Liù| null|
|2022-01-10 18:00:00| Sette| any|
|2022-01-10 20:00:00| vosem| null|
|2022-01-10 21:00:00|Shinchaku| close|
|2022-01-13 09:00:00| ve| null|
|2022-01-13 10:00:00| col| open|
|2022-01-13 11:00:00| bias| null|
|2022-01-13 12:00:00| no| close|
------------------- --------- ------
and you want to partition by day, then you can use the following window:
cumsum_window = (
Window
.partitionBy(F.date_trunc('day', 'date'))
.orderBy('date')
.rangeBetween(Window.unboundedPreceding, 0)
)
and add the following piece of code as final step:
df = df.withColumn(
'unique_sequence_id',
F.when(
F.col('sequence_id').isNotNull(),
F.concat_ws('_', F.date_format('date', 'yyyy-MM-dd'), 'sequence_id')
)
)
This is the result:
------------------- --------- ------ ----------- ------------------
| date| word|status|sequence_id|unique_sequence_id|
------------------- --------- ------ ----------- ------------------
|2022-01-10 12:00:00| un| null| null| null|
|2022-01-10 13:00:00| itnane| open| 1| 2022-01-10_1|
|2022-01-10 14:00:00| tres| null| 1| 2022-01-10_1|
|2022-01-10 15:00:00| four| close| 1| 2022-01-10_1|
|2022-01-10 16:00:00| four| other| null| null|
|2022-01-10 17:00:00| fünf| open| 2| 2022-01-10_2|
|2022-01-10 18:00:00| Liù| null| 2| 2022-01-10_2|
|2022-01-10 18:00:00| Sette| any| 2| 2022-01-10_2|
|2022-01-10 20:00:00| vosem| null| 2| 2022-01-10_2|
|2022-01-10 21:00:00|Shinchaku| close| 2| 2022-01-10_2|
|2022-01-13 09:00:00| ve| null| null| null|
|2022-01-13 10:00:00| col| open| 1| 2022-01-13_1|
|2022-01-13 11:00:00| bias| null| 1| 2022-01-13_1|
|2022-01-13 12:00:00| no| close| 1| 2022-01-13_1|
------------------- --------- ------ ----------- ------------------