I have two RDD of below type:
RDD[(int, List[(string, int)]
And I want to get the subtract set from the two RDD. The code is like:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('xxxxx.com').getOrCreate()
rdd1 = spark.sparkContext.parallelize([(1, [("foo", 101), ("bar", 111)]), (2, [("foobar", 22), ("bar", 222)]), (3, [("foo", 333)])])
rdd2 = spark.sparkContext.parallelize([(1, [("foo", 101), ("bar", 111)]), (2, [("foobar", 22), ("bar", 222)]), (3, [("foo", 333)])])
rdd = rdd1.subtract(rdd2)
rdd.toDF().show()
However I got the below errors:
d[k] = comb(d[k], v) if k in d else creator(v)
TypeError: unhashable type: 'list'
But if i change the rdd to DF first and then do subtract, it can get the right answer. Not know how to fix the issue if using rdd directly.
rdd1 = spark.sparkContext.parallelize([(1, [("foo", 101), ("bar", 111)]), (2, [("foobar", 22), ("bar", 222)]), (3, [("foo", 333)])])
rdd2 = spark.sparkContext.parallelize([(1, [("foo", 101), ("bar", 111)]), (2, [("foobar", 22), ("bar", 222)]), (3, [("foo", 333)])])
rdd = rdd1.toDF().subtract(rdd2.toDF())
rdd.show()
CodePudding user response:
First of all, the reason why this does not work in python is simple. subtract
is about finding the elements of rdd1
that are not in rdd2
. To do that, spark will put all the records with the same hash on the same partition and then check for each record of rdd1
if there is an equal record with the same hash from rdd2
. To do that, the records must be hashable. In python, tuples are but lists are not, hence the error you obtain. There are several workarounds. The easiest one would probably be to work in scala. Lists are hashable in java/scala.
val rdd1 = spark.sparkContext.parallelize(Seq((1, Seq(("foo", 101), ("bar", 111))), (2, Seq(("foobar", 22), ("bar", 222))), (3, Seq(("foo", 333)))))
val rdd2 = spark.sparkContext.parallelize(Seq((1, Seq(("foo", 101), ("bar", 111))), (2, Seq(("foobar", 22), ("bar", 222))), (3, Seq(("foo", 333)))))
// and you can check that this works
rdd1.subtract(rdd2).collect()
In python, one way at this would be to define your own list class. It would need to be hashable and to provide an __eq__
method to allow spark to know when objects are equal. Such a custom class could be defined as follows:
class my_list:
def __init__(self, list):
self.list=list
def __hash__(self):
my_hash = 0
for t in self.list:
my_hash =hash(t[0])
my_hash =t[1]
return my_hash
def __eq__(self, other_list):
self.list == other_list.list
Then, you can check that this works:
rdd1.mapValues(lambda x : my_list(x))\
.subtract(rdd2.mapValues(lambda x: my_list(x)))\
.collect()
NB: if you work in a shell, do not define the class within the shell or pickle won't be able to serialize your class. Define it in a separate file like my_list.py
and import it with pyspark --py-files my_list.py
and in the shell, import my_list from my_list
.