Home > Back-end >  How to find the maximum per group in an rdd?
How to find the maximum per group in an rdd?

Time:02-23

I'm using PySpark and I have an RDD that looks like this:

movie_stats = spark.sparkContext.parallelize([
    ("MovieA", [(1, 100), (2, 20), (3, "50")]),
    ("MovieC", [(1, 100), (2, "250"), (3, 100), (4, "120")]),        
    ("MovieB", [(1, 1000), (2, 250)]),
    ("MovieA", [(4, 50), (5, "10"), (6, 0)]),
    ("MovieB", [(3, 0), (4, "260")]),  
    ("MovieC", [(5, "180")]),
])

The first element in the tuple represents the week number and the second element represents the number of viewers. I want to find the week with the most views for each movie, but ignoring the first week. The data is also a bit messy due to some strings instead of ints and I don't know how to handle that.

I've tried some things but nothing worked, for example:

movie_stats.reduceByKey(max).collect()

returns:

[('MovieA', [(4, 50), (5, '10'), (6, 0)]),
 ('MovieC', [(5, '180')]),
 ('MovieB', [(3, 0), (4, '260')])]

so the entire second set.

Also this:

movie_stats.groupByKey().reduce(max)

which returns just this:

('MovieC', <pyspark.resultiterable.ResultIterable at 0x558f75eeb0>)

How can I solve this?

CodePudding user response:

Some code is needed to convert the string into int, and apply a map function to 1) filter out week 1 data; 2) get the week with max view.

def helper(arr: list):
    max_week = None
    for sub_arr in arr:
        for item in sub_arr:
            if item[0] == 1:
                continue
            count = int(item[1])
            if max_week is None or max_week[1] < count:
                max_week = [item[0], count]
    return max_week

movie_stats.groupByKey().map(lambda x: (x[0], helper(x[1]))).collect()

CodePudding user response:

If you want the most views per movie, ignoring the first week ... [('MovieA', 50), ('MovieC', 250), ('MovieB', 260)]

Then, you'll want your own map function rather than a reduce.

movie_stats = spark.sparkContext.parallelize([
    ("MovieA", [(1, 100), (2, 20), (3, "50")]),
    ("MovieC", [(1, 100), (2, "250"), (3, 100), (4, "120")]),        
    ("MovieB", [(1, 1000), (2, 250)]),
    ("MovieA", [(4, 50), (5, "10"), (6, 0)]),
    ("MovieB", [(3, 0), (4, "260")]),  
    ("MovieC", [(5, "180")]),
])

def get_views_after_first_week(v):
  values = iter(v)  # iterator of tuples, groupped by key
  result = list()
  for x in values:
    result.extend([int(y[1]) for y in x if y[0] > 1])
  return result

mapped = movie_stats.groupByKey().mapValues(get_views_after_first_week).mapValues(max)
mapped.collect()

to include the week number... [('MovieA', (3, 50)), ('MovieC', (2, 250)), ('MovieB', (4, 260))]

def get_max_weekly_views_after_first_week(v):
  values = iter(v)  # iterator of tuples, groupped by key
  max_views = float('-inf')
  max_week = None
  for x in values:
    for t in x:
      week, views = t
      views = int(views)
      if week > 1 and views > max_views:
        max_week = week
        max_views = views
  return (max_week, max_views, )

mapped = movie_stats.groupByKey().mapValues(get_max_weekly_views_after_first_week)
  • Related