Home > Software design >  Best way to join 2 pair RDDs with exactly the same keys and where all keys are unique
Best way to join 2 pair RDDs with exactly the same keys and where all keys are unique

Time:07-20

I want to join two RDDs such as R(K, V) and S(K, W), where the sets of keys from R and S are identical and the keys are unique. The resultant RDD should look like (K,(V,W)). Both the RDDs R and S and formed by using the map function to create key-value pairs. What is the most optimal way to carry out this operation? Both the RDDs cannot fit in the driver. Is there a way to use partitionby() to optimize this?

I am using Pyspark.

CodePudding user response:

You can use a join().

data1_ls = [
    (1, 'a'), 
    (2, 'b')
]

data2_ls = [
    (1, 'c'),
    (2, 'd')
]

data1_rdd = spark.sparkContext.parallelize(data1_ls)
data2_rdd = spark.sparkContext.parallelize(data2_ls)

data1_rdd.join(data2_rdd).collect()
# [(1, ('a', 'c')), (2, ('b', 'd'))]

CodePudding user response:

If you have the same number of unique keys in both R and S, then there are many ways to do that without a join. However, do not immediately discard the join(), which could already be optimized to handle cases like this. Instead, you should do some benchmarks to find out the best method.

Here are a couple of ideas (untested):

  • Compute the union of the RDDs and then run a groupByKey(). The resulting RDD should contain pairs of type (K, [V, W]).

    rdd1.union(rdd2).groupByKey() #.mapValues(list) if you collect() immediately after
    

    The same (more or less) could probably be accomplished by using rdd1.cogroup(rdd2), I am not sure it is more performant though.

  • You could sort the RDDs by their keys and then zip them. Finally, you map each pair from ((K, V), (K, W)) to (K, (V, W)).

    rdd1.sortByKey().zip(rdd2.sortByKey()).map(lambda x: (x[0][0], (x[0][1], x[1][1])))
    

I do not see the benefit of using partitionBy() here, given that there is no reason to repartition your RDDs. You could also try to use reduceByKey()/aggregateByKey()/... instead of groupByKey() but I doubt the performances would be much different.

  • Related