In spark there is zipPartitions
to merge multiple RDDs into one. However, there is no such method available with pyspark RDD. If I use zip
multiple times then I create one new dataframe per rdd that I combined, which is not what I want.
How can one zip multiple RDDs into one in pyspark?
CodePudding user response:
Good question. The introduction of zipPartitions
in PySpark was proposed in 2016, but as you can read among comments, they never managed to find a good compromise between performances and solution complexity. The issue is now closed but I do not think it will be reopened in the near future. This is the solution proposed by Joseph E. Gonzalez.
The quickest way to use that API is to write it yourself (performances will not be that good of course). A very naive zipPartitions
implementation is:
def zipPartitions(rdd1, rdd2, func):
rdd1_numPartitions = rdd1.getNumPartitions()
rdd2_numPartitions = rdd2.getNumPartitions()
assert rdd1_numPartitions == rdd2_numPartitions, "rdd1 and rdd2 must have the same number of partitions"
paired_rdd1 = rdd1.mapPartitionsWithIndex(lambda index, it: ((index, list(it)),))
paired_rdd2 = rdd2.mapPartitionsWithIndex(lambda index, it: ((index, list(it)),))
zipped_rdds = paired_rdd1.join(paired_rdd2, numPartitions=rdd1_numPartitions)\
.flatMap(lambda x: func(x[1][0], x[1][1]))
return zipped_rdds
You can test it with:
rdd1 = sc.parallelize(range(30), 3)
rdd2 = sc.parallelize(range(50), 3)
zipPartitions(rdd1, rdd2, lambda it1, it2: itertools.zip_longest(it1, it2))\
.glom().collect()
Arguments are easy to understand, they are, in order, the first rdd, the second rdd and a function accepting 2 partition iterators, one for each rdd.
With the assert rdd1_numPartitions == rdd2_numPartitions
I make sure both rdd have the same number of partitions, that is a precondition for the Scala version too.
Then I use mapPartitionsWithIndex
on both rdds to transform, for example, an rdd with two partitions, from:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
To
[(0, [0, 1, 2, 3, 4]), (1, [5, 6, 7, 8, 9])]
Note: the conversion from it
to list(it)
is unfortunately necessary because in most of the python implementations you cannot pickle generators, and the it
parameter is a generator. There is an exception that allows you to convert it
into a list, a case that pyspark handles with a very clever optimization, I am talking about an rdd created from a range()
. In fact, considering the previous example,
range(10)
becomes
[(0, range(0, 5)), (1, range(5, 10))]
Next I can join
the two new rdds on the partition index. numPartitions
can be easily predicted because we have previously asserted that both rdd must have the same number of partitions, so they are in a 1-to-1 relationship. Finally, I can apply the passed function and flatten the list of partition results.