Home > Back-end >  Aggregate rows of Spark RDD to String after groupby
Aggregate rows of Spark RDD to String after groupby

Time:04-05

I have an RDD like the below, where the first entry in the tuple is an author, and the second entry is the title of the publication.

[('Hector Garcia-Molina', 'Distributed Databases.'),
 ('Meichun Hsu', 'Distributed Databases.'),
 ('Won Kim', 'On Resolving Schematic Heterogeneity in Multidatabase Systems.'),
 ('Won Kim',
  'Requirements for a Performance Benchmark for Object-Oriented Database Systems.')]

I would like to concatenate the publication titles together for each author. The example output would be:

[('Hector Garcia-Molina', 'Distributed Databases.'),
 ('Meichun Hsu', 'Distributed Databases.'),
 ('Won Kim', 'On Resolving Schematic Heterogeneity in Multidatabase Systems.''Requirements for a Performance Benchmark for Object-Oriented Database Systems.')]

How can I do this, given that this is an RDD? I've seen solutions for how to do this with dataframes, but not with RDDs.

Here is what I tried, but I suspect the issue is how I am using reduceByKey. Pyspark SQL function collect_list works for dataframes, but unfortunately I need to keep this data as an RDD.

title_author.map(lambda r: [(r[0],r[1]) for r[1] in r]).reduceByKey(add)

CodePudding user response:

Try this as alternative.

   def listToString(s): 
    
    # initialize an empty string    str1  = ele
    str1 = "" 
    cnt = 0
    
    # traverse in the string  
    for ele in s: 
        if cnt == 0:
          str1  = "\'"   ele   "\'"
          
        else:
          str1  = ","   "\'"   ele   "\'"
          
        cnt  = 1
        
    # return string  
    return str1
  
rdd = sc.sparkContext.parallelize (
[('Hector Garcia-Molina', 'Distributed Databases.'),
 ('Meichun Hsu', 'Distributed Databases.'),
 ('Won Kim', 'On Resolving Schematic Heterogeneity in Multidatabase Systems.'),
 ('Won Kim',
  'Requirements for a Performance Benchmark for Object-Oriented Database Systems.')] )

rdd2 = rdd.groupByKey().mapValues(sorted)
rdd2.take(5)

rdd3 = rdd2.map(lambda x: (x[0], listToString(x[1]))) 
rdd3.take(5)  

You can also do this:

rdd4 = rdd2.reduceByKey(_   _)
rdd4.take(5)

Try and see what is closest.

  • Related