Home > Software engineering >  ReduceByKey for two columns and count rows RDD
ReduceByKey for two columns and count rows RDD

Time:07-18

My data consists of four columns, the chart type, the name of a song, the position of the song in the chart and the day that the song had the specific position in the chart. How can I find the total days of each song in the first place of the chart? I want my result to look like: chart_type, song, days in #1

First I filter the chart position and I keep only #1. What should I do next? ReduceByKey for song, then reduce for chart type and then count records to find the total days in #1 for each song in each chart type?

('top200', '501', '1', '2021-03-26T00:00:00.000 02:00')
('top200', '501', '1', '2021-03-27T00:00:00.000 02:00')
('top200', '501', '1', '2021-03-28T00:00:00.000 02:00')
('viral50', 'Gowtu', '1', '2017-03-17T00:00:00.000 02:00')
('viral50', 'Gowtu', '1', '2017-03-18T00:00:00.000 02:00')
('viral50', 'Gowtu', '1', '2017-03-19T00:00:00.000 02:00')
('top200', 'Lonely (with benny blanco)', '1', '2020-11-09T00:00:00.000 02:00')
('top200', 'Lonely (with benny blanco)', '1', '2020-11-10T00:00:00.000 02:00')
('top200', 'Lonely (with benny blanco)', '1', '2020-11-11T00:00:00.000 02:00')

Thank you

CodePudding user response:

If you want to do this using rdd and a count is required grouped by the first 2 elements, we can do the following.

data_ls = [
    ('top200', '501', '1', '2021-03-26T00:00:00.000 02:00'),
    ('top200', '501', '1', '2021-03-27T00:00:00.000 02:00'),
    ('top200', '501', '1', '2021-03-28T00:00:00.000 02:00'),
    ('viral50', 'Gowtu', '1', '2017-03-17T00:00:00.000 02:00'),
    ('viral50', 'Gowtu', '1', '2017-03-18T00:00:00.000 02:00'),
    ('viral50', 'Gowtu', '1', '2017-03-19T00:00:00.000 02:00'),
    ('top200', 'Lonely (with benny blanco)', '1', '2020-11-09T00:00:00.000 02:00'),
    ('top200', 'Lonely (with benny blanco)', '1', '2020-11-10T00:00:00.000 02:00'),
    ('top200', 'Lonely (with benny blanco)', '1', '2020-11-11T00:00:00.000 02:00')
]

data_rdd = spark.sparkContext.parallelize(data_ls)

from operator import add

data_rdd. \
    map(lambda gk: ((gk[0], gk[1]), 1)). \
    reduceByKey(add). \
    collect()

# [(('top200', '501'), 3),
#  (('top200', 'Lonely (with benny blanco)'), 3),
#  (('viral50', 'Gowtu'), 3)]
  • Related