Home > other >  Sum of weighted values
Sum of weighted values

Time:07-27

My original data frame looks like this one below, you can generate data frame by code at the end.

 ----- ------- --- 
| name|ts_week|tag|
 ----- ------- --- 
|  Bob|  week1|  a|
|  Bob|  week1|  b|
|  Bob|  week1|  c|
|  Bob|  week2|  a|
|  Bob|  week2|  b|
|  Bob|  week2|  d|
|  Bob|  week3|  c|
|  Bob|  week3|  d|
|  Bob|  week4|  a|
|  Bob|  week4|  d|
|Allen|  week1|  a|
|Allen|  week2|  c|
|Allen|  week3|  a|
|Allen|  week3|  b|
|Allen|  week4|   |
 ----- ------- --- 

For all tags in week 1 (each time we only consider all tags in one month time-window, whatever happens in week 5,6,7,8 has nothing to do with this question) get 0.1 points, in week 2 get 0.2 points, in week 3 get 0.3, and tags in week4 get 0.1 points.

And finally, we get the points for each name:

Bob:                 
a= 0.7
b= 0.3
c= 0.4
d= 0.7

Allen:
a= 0.4
b= 0.3
c= 0.2

Thus, for Bob his tag for week 4 (target week in this one) is a (I didn’t get rule about dealing tier-up tag so I pick one), for Allen his tag for week 4 is a. In both two cases, 'a' gets higher score.

FYI, this data frame is quite large, contains millions row, so based on what I know using anything in PySpark pandas may create OOM problem, maybe because it stores all in memory directly.

data_ls = [('Bob', 'week1', 'a'),
                ('Bob', 'week1', 'b'),
                ('Bob', 'week1', 'c'),
                ('Bob', 'week2', 'a'),
                ('Bob', 'week2', 'b'),
                ('Bob', 'week2', 'd'),
                ('Bob', 'week3', 'c'),
                ('Bob', 'week3', 'd'),
                ('Bob', 'week4', 'a'),
                ('Bob', 'week4', 'd'),
                ('Allen', 'week1', 'a'),
                ('Allen', 'week2', 'c'),
                ('Allen', 'week3', 'a'),
                ('Allen', 'week3', 'b'),
                ('Allen', 'week4', '')]
data_sdf = spark.sparkContext.parallelize(data_ls).toDF(['name', 'ts_week', 'tag'])

CodePudding user response:

data_ls = [
    ('Bob', 'week1', 'a'),
    ('Bob', 'week1', 'b'),
    ('Bob', 'week1', 'c'),
    ('Bob', 'week2', 'a'),
    ('Bob', 'week2', 'b'),
    ('Bob', 'week2', 'd'),
    ('Bob', 'week3', 'c'),
    ('Bob', 'week3', 'd'),
    ('Bob', 'week4', 'a'),
    ('Bob', 'week4', 'd'),
    ('Allen', 'week1', 'a'),
    ('Allen', 'week2', 'c'),
    ('Allen', 'week3', 'a'),
    ('Allen', 'week3', 'b'),
    ('Allen', 'week4', '')
]

points = [
    ('week1', 1),
    ('week2', 2),
    ('week3', 3),
    ('week4', 1),
]
df_point = spark.createDataFrame(points, ['ts_week', 'points'])
       
data_sdf = spark.createDataFrame(data_ls, ['name', 'ts_week', 'tag'])
data_sdf = data_sdf.join(df_point, ['ts_week'], "left")

data_sdf.groupBy('name', 'tag').agg((sum('points') / 10).alias('points')).sort('name', 'tag').show(20, False)
#  ----- --- ------ 
# |name |tag|points|
#  ----- --- ------ 
# |Allen|   |0.1   |
# |Allen|a  |0.4   |
# |Allen|b  |0.3   |
# |Allen|c  |0.2   |
# |Bob  |a  |0.4   |
# |Bob  |b  |0.3   |
# |Bob  |c  |0.4   |
# |Bob  |d  |0.6   |
#  ----- --- ------ 

CodePudding user response:

An approach creating a when condition and then aggregating.

score = F
for k, v in {'week1': .1, 'week2': .2, 'week3': .3, 'week4': .1}.items():
    score = score.when(F.col('ts_week') == k, v)

data_sdf = (data_sdf
    .groupBy('name', 'tag')
    .agg(F.round(F.sum(score), 1).alias('w_score')))

data_sdf.show()
#  ----- --- ------- 
# | name|tag|w_score|
#  ----- --- ------- 
# |  Bob|  c|    0.4|
# |  Bob|  d|    0.6|
# |  Bob|  b|    0.3|
# |  Bob|  a|    0.4|
# |Allen|  a|    0.4|
# |Allen|  b|    0.3|
# |Allen|   |    0.1|
# |Allen|  c|    0.2|
#  ----- --- ------- 
  • Related