I want to join two RDDs such as R(K, V) and S(K, W), where the sets of keys from R and S are identical and the keys are unique. The resultant RDD should look like (K,(V,W)). Both the RDDs R and S and formed by using the map function to create key-value pairs. What is the most optimal way to carry out this operation? Both the RDDs cannot fit in the driver. Is there a way to use partitionby() to optimize this?
I am using Pyspark.
CodePudding user response:
You can use a join()
.
data1_ls = [
(1, 'a'),
(2, 'b')
]
data2_ls = [
(1, 'c'),
(2, 'd')
]
data1_rdd = spark.sparkContext.parallelize(data1_ls)
data2_rdd = spark.sparkContext.parallelize(data2_ls)
data1_rdd.join(data2_rdd).collect()
# [(1, ('a', 'c')), (2, ('b', 'd'))]
CodePudding user response:
If you have the same number of unique keys in both R and S, then there are many ways to do that without a join. However, do not immediately discard the join()
, which could already be optimized to handle cases like this. Instead, you should do some benchmarks to find out the best method.
Here are a couple of ideas (untested):
Compute the union of the RDDs and then run a
groupByKey()
. The resulting RDD should contain pairs of type(K, [V, W])
.rdd1.union(rdd2).groupByKey() #.mapValues(list) if you collect() immediately after
The same (more or less) could probably be accomplished by using
rdd1.cogroup(rdd2)
, I am not sure it is more performant though.You could sort the RDDs by their keys and then zip them. Finally, you map each pair from
((K, V), (K, W))
to(K, (V, W))
.rdd1.sortByKey().zip(rdd2.sortByKey()).map(lambda x: (x[0][0], (x[0][1], x[1][1])))
I do not see the benefit of using partitionBy()
here, given that there is no reason to repartition your RDDs. You could also try to use reduceByKey()
/aggregateByKey()
/... instead of groupByKey()
but I doubt the performances would be much different.