My original data frame looks like this one below, you can generate data frame by code at the end.
----- ------- ---
| name|ts_week|tag|
----- ------- ---
| Bob| week1| a|
| Bob| week1| b|
| Bob| week1| c|
| Bob| week2| a|
| Bob| week2| b|
| Bob| week2| d|
| Bob| week3| c|
| Bob| week3| d|
| Bob| week4| a|
| Bob| week4| d|
|Allen| week1| a|
|Allen| week2| c|
|Allen| week3| a|
|Allen| week3| b|
|Allen| week4| |
----- ------- ---
For all tags in week 1 (each time we only consider all tags in one month time-window, whatever happens in week 5,6,7,8 has nothing to do with this question) get 0.1 points, in week 2 get 0.2 points, in week 3 get 0.3, and tags in week4 get 0.1 points.
And finally, we get the points for each name:
Bob:
a= 0.7
b= 0.3
c= 0.4
d= 0.7
Allen:
a= 0.4
b= 0.3
c= 0.2
Thus, for Bob his tag for week 4 (target week in this one) is a (I didn’t get rule about dealing tier-up tag so I pick one), for Allen his tag for week 4 is a. In both two cases, 'a' gets higher score.
FYI, this data frame is quite large, contains millions row, so based on what I know using anything in PySpark pandas may create OOM problem, maybe because it stores all in memory directly.
data_ls = [('Bob', 'week1', 'a'),
('Bob', 'week1', 'b'),
('Bob', 'week1', 'c'),
('Bob', 'week2', 'a'),
('Bob', 'week2', 'b'),
('Bob', 'week2', 'd'),
('Bob', 'week3', 'c'),
('Bob', 'week3', 'd'),
('Bob', 'week4', 'a'),
('Bob', 'week4', 'd'),
('Allen', 'week1', 'a'),
('Allen', 'week2', 'c'),
('Allen', 'week3', 'a'),
('Allen', 'week3', 'b'),
('Allen', 'week4', '')]
data_sdf = spark.sparkContext.parallelize(data_ls).toDF(['name', 'ts_week', 'tag'])
CodePudding user response:
data_ls = [
('Bob', 'week1', 'a'),
('Bob', 'week1', 'b'),
('Bob', 'week1', 'c'),
('Bob', 'week2', 'a'),
('Bob', 'week2', 'b'),
('Bob', 'week2', 'd'),
('Bob', 'week3', 'c'),
('Bob', 'week3', 'd'),
('Bob', 'week4', 'a'),
('Bob', 'week4', 'd'),
('Allen', 'week1', 'a'),
('Allen', 'week2', 'c'),
('Allen', 'week3', 'a'),
('Allen', 'week3', 'b'),
('Allen', 'week4', '')
]
points = [
('week1', 1),
('week2', 2),
('week3', 3),
('week4', 1),
]
df_point = spark.createDataFrame(points, ['ts_week', 'points'])
data_sdf = spark.createDataFrame(data_ls, ['name', 'ts_week', 'tag'])
data_sdf = data_sdf.join(df_point, ['ts_week'], "left")
data_sdf.groupBy('name', 'tag').agg((sum('points') / 10).alias('points')).sort('name', 'tag').show(20, False)
# ----- --- ------
# |name |tag|points|
# ----- --- ------
# |Allen| |0.1 |
# |Allen|a |0.4 |
# |Allen|b |0.3 |
# |Allen|c |0.2 |
# |Bob |a |0.4 |
# |Bob |b |0.3 |
# |Bob |c |0.4 |
# |Bob |d |0.6 |
# ----- --- ------
CodePudding user response:
An approach creating a when
condition and then aggregating.
score = F
for k, v in {'week1': .1, 'week2': .2, 'week3': .3, 'week4': .1}.items():
score = score.when(F.col('ts_week') == k, v)
data_sdf = (data_sdf
.groupBy('name', 'tag')
.agg(F.round(F.sum(score), 1).alias('w_score')))
data_sdf.show()
# ----- --- -------
# | name|tag|w_score|
# ----- --- -------
# | Bob| c| 0.4|
# | Bob| d| 0.6|
# | Bob| b| 0.3|
# | Bob| a| 0.4|
# |Allen| a| 0.4|
# |Allen| b| 0.3|
# |Allen| | 0.1|
# |Allen| c| 0.2|
# ----- --- -------