Home > Enterprise >  How to calculate values for a column in a row based on previous row's column's value for a
How to calculate values for a column in a row based on previous row's column's value for a

Time:08-18

I have a column 'val' whose value gets calculated at each row and then the next row takes in that value and applies some logic on it, and then value for that row also gets updated. It can be shown as follows:-

val(x) = f(val(x-1), col_a(x), col_b(x)) where x is the row number (indexed at 0)

val(0) = f(col_a(0), col_b(0)) {some fixed value calculated based on two columns}

val(0) represents the first value in a partition.

[ f here represents some arbitrary function]

I tried using lag function as follows (for a sample dataframe):-

windowSpec  = Window.partitionBy("department")
 ------------- ---------- ------ ------ ------ 
|employee_name|department|   a  |   b  |  val |   
 ------------- ---------- ------ ------ ------ 
|James        |Sales     |3000  |2500  |5500  | #val(0) =  (a(0)   b(0)) = 5500 [first value within a partition]
|Michael      |Sales     |4600  |1650  |750   | #val(1) = (a(1)   b(1) - val(0)) =  750
|Robert       |Sales     |4100  |1100  |4450  | #val(2) = (a(2)   b(2) - val(1)) =  4450 
|Maria        |Finance   |3000  |7000  |xxxx  | #....... and so on, this is how I want the calculations to take place.
|James        |Finance   |3000  |5000  |xxxx  |
|Scott        |Marketing |3300  |4300  |xxxx  |
|Jen          |Marketing |3900  |3700  |xxxx  |

df = df.withColumn("val",col("a")   col("b") - lag("val",1).over(windowSpec)) #I tried this but it does not have the desired result.

How can I implement this in PySpark?

CodePudding user response:

Tracking the previously calculated value from the same column is hard to do in spark -- I'm not saying it's impossible, and there certainly are ways (hacks) to achieve it. One way to do is using array of structs and aggregate function.

Two assumptions in your data

  • There is an ID column that has the sort order of the data - spark does not retain dataframe sorting due to its distributed nature
  • There is a grouping key for the processing to be optimized
# input data with aforementioned assumptions
data_sdf.show()

#  --- --- ------- --------- ---- ---- 
# | gk|idx|   name|     dept|   a|   b|
#  --- --- ------- --------- ---- ---- 
# | gk|  1|  James|    Sales|3000|2500|
# | gk|  2|Michael|    Sales|4600|1650|
# | gk|  3| Robert|    Sales|4100|1100|
# | gk|  4|  Maria|  Finance|3000|7000|
# | gk|  5|  James|  Finance|3000|5000|
# | gk|  6|  Scott|Marketing|3300|4300|
# | gk|  7|    Jen|Marketing|3900|3700|
#  --- --- ------- --------- ---- ---- 
# create structs with all columns and collect it to an array
# use the array of structs to do the val calcs
# NOTE - keep the ID field at the beginning for the `array_sort` to work as reqd
arr_of_structs_sdf = data_sdf. \
    withColumn('allcol_struct', func.struct(*data_sdf.columns)). \
    groupBy('gk'). \
    agg(func.array_sort(func.collect_list('allcol_struct')).alias('allcol_struct_arr'))

# function to create struct schema string
struct_fields = lambda x: ', '.join([str(x) '.' k ' as ' k for k in data_sdf.columns])

# use `aggregate` to do the val calc
arr_of_structs_sdf. \
    withColumn('new_allcol_struct_arr',
               func.expr('''
                         aggregate(slice(allcol_struct_arr, 2, size(allcol_struct_arr)),
                                   array(struct({0}, (allcol_struct_arr[0].a allcol_struct_arr[0].b) as val)),
                                   (x, y) -> array_union(x, 
                                                         array(struct({1}, ((y.a y.b)-element_at(x, -1).val) as val))
                                                         )
                                   )
                         '''.format(struct_fields('allcol_struct_arr[0]'), struct_fields('y'))
                         )
               ). \
    selectExpr('inline(new_allcol_struct_arr)'). \
    show(truncate=False)

#  --- --- ------- --------- ---- ---- ---- 
# |gk |idx|name   |dept     |a   |b   |val |
#  --- --- ------- --------- ---- ---- ---- 
# |gk |1  |James  |Sales    |3000|2500|5500|
# |gk |2  |Michael|Sales    |4600|1650|750 |
# |gk |3  |Robert |Sales    |4100|1100|4450|
# |gk |4  |Maria  |Finance  |3000|7000|5550|
# |gk |5  |James  |Finance  |3000|5000|2450|
# |gk |6  |Scott  |Marketing|3300|4300|5150|
# |gk |7  |Jen    |Marketing|3900|3700|2450|
#  --- --- ------- --------- ---- ---- ---- 
  • Related