RDD_Input = [(('377', '80'), ('1', '4')), (('377', '510'), ('1', '5')), (('377', '79'), ('1', '4')), (('377', '791'), ('1', '1')), (('377', '511'), ('1', '4')), (('377', '433'), ('1', '3')), (('377', '687'), ('1', '1')), (('377', '456'), ('1', '1')), (('377', '399'), ('1', '4')), (('377', '96'), ('1', '5')), (('377', '780'), ('1', '1')), (('377', '683'), ('1', '1')), (('377', '403'), ('1', '5')), (('377', '999'), ('1', '4')), (('377', '502'), ('1', '4')), (('377', '435'), ('1', '5')), (('377', '550'), ('1', '5')), (('377', '948'), ('1', '1')), (('377', '393'), ('1', '4')), (('377', '648'), ('1', '4'))]
The input RDD is in key-value pairs ((movie1, movie2), (rating1, rating2))
.
How do I transform the RDD into((movie1, movie2), (rating1, rating2), (rating3, rating4), (rating5, rating6), ...))
?
Expected result example: (('377', '399'), ('1', '4'), ('1', '4'))
('377', '399')
being the key and the following tuple index is appended based on the same key.
The requirement is to use purely RDD API.
CodePudding user response:
It seems, you want to first groupByKey
putting values into a list and then just map
.
data = [(('a', 'b'), ('1', '4')), (('a', 'b'), ('3', '5')), (('c', 'd'), ('2', '2'))]
rdd = sc.parallelize(data)
rdd = rdd.groupByKey().mapValues(list)
rdd = rdd.map(lambda x: (x[0], *x[1]))
print(rdd.collect())
# [(('c', 'd'), ('2', '2')), (('a', 'b'), ('1', '4'), ('3', '5'))]