I have an RDD like the below, where the first entry in the tuple is an author, and the second entry is the title of the publication.
[('Hector Garcia-Molina', 'Distributed Databases.'),
('Meichun Hsu', 'Distributed Databases.'),
('Won Kim', 'On Resolving Schematic Heterogeneity in Multidatabase Systems.'),
('Won Kim',
'Requirements for a Performance Benchmark for Object-Oriented Database Systems.')]
I would like to concatenate the publication titles together for each author. The example output would be:
[('Hector Garcia-Molina', 'Distributed Databases.'),
('Meichun Hsu', 'Distributed Databases.'),
('Won Kim', 'On Resolving Schematic Heterogeneity in Multidatabase Systems.''Requirements for a Performance Benchmark for Object-Oriented Database Systems.')]
How can I do this, given that this is an RDD? I've seen solutions for how to do this with dataframes, but not with RDDs.
Here is what I tried, but I suspect the issue is how I am using reduceByKey
. Pyspark SQL function collect_list
works for dataframes, but unfortunately I need to keep this data as an RDD.
title_author.map(lambda r: [(r[0],r[1]) for r[1] in r]).reduceByKey(add)
CodePudding user response:
Try this as alternative.
def listToString(s):
# initialize an empty string str1 = ele
str1 = ""
cnt = 0
# traverse in the string
for ele in s:
if cnt == 0:
str1 = "\'" ele "\'"
else:
str1 = "," "\'" ele "\'"
cnt = 1
# return string
return str1
rdd = sc.sparkContext.parallelize (
[('Hector Garcia-Molina', 'Distributed Databases.'),
('Meichun Hsu', 'Distributed Databases.'),
('Won Kim', 'On Resolving Schematic Heterogeneity in Multidatabase Systems.'),
('Won Kim',
'Requirements for a Performance Benchmark for Object-Oriented Database Systems.')] )
rdd2 = rdd.groupByKey().mapValues(sorted)
rdd2.take(5)
rdd3 = rdd2.map(lambda x: (x[0], listToString(x[1])))
rdd3.take(5)
You can also do this:
rdd4 = rdd2.reduceByKey(_ _)
rdd4.take(5)
Try and see what is closest.