I'm trying to collect groups of rows into sliding windows represented as vectors.
Given the example input:
--- ----- -----
| id|Label|group|
--- ----- -----
| A| T| 1|
| B| T| 1|
| C| F| 2|
| D| F| 2|
| E| F| 3|
| F| T| 3|
| G| F| 3|
| H| T| 3|
--- ----- -----
An expected output would be:
windows_size = 3
stride = 1
id_padding = ''
label_padding = 'f'
----- ------------- -------------
|group| Windows| Labels|
----- ------------- -------------
| 1| [A, B, '']| [T, T, f]|
| 2| [C, D, '']| [F, F, f]|
| 3| [E, F, G]| [F, T, F]|
| 3| [F, G, H]| [T, F, T]|
----- ------------- -------------
My latest attempt produces tumbling windows without padding. Here's my code:
from pyspark.sql import functions as F
from pyspark.sql import Window
data = [
("A", "T", 1),
("B", "T", 1),
("C", "F", 2),
("D", "F", 2),
("E", "F", 3),
("F", "T", 3),
("G", "F", 3),
("H", "T", 3),
]
df = spark.createDataFrame(data, ['id', 'label', 'group'])
grouping = 3
w2 = Window.partitionBy('group').orderBy('id')
df = df.withColumn("rows",((F.row_number().over(w2)-1) / grouping).astype('int') )
df.groupBy('group', 'rows')\
.agg(F.collect_list('id').alias("Windows"), F.collect_list('Label').alias("Labels"))\
.drop('rows') \
.orderBy('group').show()
I tried looking for variations of this, maybe by performing a SQL query like in this case or with some built-in SQL function such as ROWS N PRECEDING, but I didn't manage to do what I want. Most results from the web focus on temporal sliding windows, but I'm trying to do it over rows instead.
Any help would be greatly appreciated.
EDIT:
I think I found a solution for the padding thanks to this answer.
I still need to organize the rows in sliding windows though...
CodePudding user response:
One possible solution (not the most elegant one, but still functional) is the following.
In the window definition, it uses .rowsBetween
to create a sliding window of the specified size
; 0
indicates the current row.
import pyspark.sql.functions as F
from pyspark.sql.window import Window
# parameters
size = 3
id_padding = '\'\''
label_padding = 'f'
# windows
w = Window.partitionBy('group')
w_ordered = Window.partitionBy('group').orderBy('id')
w_ordered_limited = Window.partitionBy('group').orderBy('id').rowsBetween(0, size - 1)
(df.select(
'group',
F.collect_list('id').over(w_ordered_limited).alias('Windows'),
F.collect_list('Label').over(w_ordered_limited).alias('Groups'),
F.count('group').over(w).alias('n'),
F.row_number().over(w_ordered).alias('n_row')
)
# pad arrays and then slice them to the desired `size`
.withColumn('Windows', F.when(F.col('n') < size, F.slice(F.concat('Windows', F.array_repeat(F.lit(id_padding), size - 1)), 1, size))
.otherwise(F.col('Windows')))
.withColumn('Groups', F.when(F.col('n') < size, F.slice(F.concat('Groups', F.array_repeat(F.lit(label_padding), size - 1)), 1, size))
.otherwise(F.col('Groups')))
# filter out useless rows
.filter( ((F.col('n') < size) & (F.col('n_row') == 1))
| ((F.col('n') >= size) & (F.size('Windows') == size)))
.drop('n', 'n_row')
).show()
----- ---------- ---------
|group| Windows| Groups|
----- ---------- ---------
| 1|[A, B, '']|[T, T, f]|
| 2|[C, D, '']|[F, F, f]|
| 3| [E, F, G]|[F, T, F]|
| 3| [F, G, H]|[T, F, T]|
----- ---------- ---------
I suggest you to go through the solution step-by-step, one code line at a time, to understand the logic behind it.
CodePudding user response:
To expand on @Ric S answer, I needed to account for the stride as well.
My solution to that was to play around with the condition and transformations based on the stride value and its ratio with the win_size:
import pyspark.sql.functions as F
from pyspark.sql.window import Window
# parameters
size = 5
stride = 2
id_padding = '\'\''
label_padding = 'f'
# windows
w = Window.partitionBy('group')
w_ordered = Window.partitionBy('group').orderBy('id')
w_ordered_limited = Window.partitionBy('group').orderBy('id').rowsBetween(0, size - 1)
if stride == 1:
filter_cond = (F.col('n') >= size) & (F.size('Windows') == size)
else:
filter_cond = (F.col('n') >= size) & (F.col('n_row') < size) & (F.col('n_row') % stride == 1) & (F.size('Windows') == size)
if size % stride != 0:
transf = lambda dfcol : F.slice(F.concat(dfcol, F.array_repeat(F.lit(id_padding), size - 1)), 1, size)
else:
transf = lambda dfcol : F.when(F.col('n') < size, F.slice(F.concat(dfcol, F.array_repeat(F.lit(id_padding), size - 1)), 1, size)) \
.otherwise(F.col(dfcol))
(df.select(
'group',
F.collect_list('id').over(w_ordered_limited).alias('Windows'),
F.collect_list('Label').over(w_ordered_limited).alias('Groups'),
F.count('group').over(w).alias('n'),
F.row_number().over(w_ordered).alias('n_row')
)
# pad arrays and then slice them to the desired `size`
.withColumn('Windows', transf("Windows"))
.withColumn('Groups', transf("Groups"))
# filter out useless rows
.filter( ((F.col('n') < size) & (F.col('n_row') == 1))
| (filter_cond))
.drop('n', 'n_row')
).show()
Whose output, given a slightly bigger input than the one given in the question, is:
----- ------------------ ------------------
|group| Windows| Groups|
----- ------------------ ------------------
| 1|[A, B, '', '', '']|[T, T, '', '', '']|
| 2|[C, D, '', '', '']|[F, F, '', '', '']|
| 3| [E, F, G, H, I]| [F, T, F, T, T]|
| 3| [G, H, I, J, '']| [F, T, T, T, '']|
----- ------------------ ------------------