Home > database >  Fail to subtract two RDD with list column in PySpark
Fail to subtract two RDD with list column in PySpark

Time:07-13

I have two RDD of below type:

RDD[(int, List[(string, int)]

And I want to get the subtract set from the two RDD. The code is like:

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('xxxxx.com').getOrCreate()

rdd1 = spark.sparkContext.parallelize([(1, [("foo", 101), ("bar", 111)]), (2, [("foobar", 22), ("bar", 222)]), (3, [("foo", 333)])])
rdd2 = spark.sparkContext.parallelize([(1, [("foo", 101), ("bar", 111)]), (2, [("foobar", 22), ("bar", 222)]), (3, [("foo", 333)])])

rdd = rdd1.subtract(rdd2)
rdd.toDF().show()

However I got the below errors:

    d[k] = comb(d[k], v) if k in d else creator(v)
TypeError: unhashable type: 'list'

But if i change the rdd to DF first and then do subtract, it can get the right answer. Not know how to fix the issue if using rdd directly.

rdd1 = spark.sparkContext.parallelize([(1, [("foo", 101), ("bar", 111)]), (2, [("foobar", 22), ("bar", 222)]), (3, [("foo", 333)])])
rdd2 = spark.sparkContext.parallelize([(1, [("foo", 101), ("bar", 111)]), (2, [("foobar", 22), ("bar", 222)]), (3, [("foo", 333)])])

rdd = rdd1.toDF().subtract(rdd2.toDF())
rdd.show()

CodePudding user response:

First of all, the reason why this does not work in python is simple. subtract is about finding the elements of rdd1 that are not in rdd2. To do that, spark will put all the records with the same hash on the same partition and then check for each record of rdd1 if there is an equal record with the same hash from rdd2. To do that, the records must be hashable. In python, tuples are but lists are not, hence the error you obtain. There are several workarounds. The easiest one would probably be to work in scala. Lists are hashable in java/scala.

val rdd1 = spark.sparkContext.parallelize(Seq((1, Seq(("foo", 101), ("bar", 111))), (2, Seq(("foobar", 22), ("bar", 222))), (3, Seq(("foo", 333)))))
val rdd2 = spark.sparkContext.parallelize(Seq((1, Seq(("foo", 101), ("bar", 111))), (2, Seq(("foobar", 22), ("bar", 222))), (3, Seq(("foo", 333)))))
// and you can check that this works
rdd1.subtract(rdd2).collect()

In python, one way at this would be to define your own list class. It would need to be hashable and to provide an __eq__ method to allow spark to know when objects are equal. Such a custom class could be defined as follows:

class my_list:
    def __init__(self, list):
        self.list=list
        
    def __hash__(self):
        my_hash = 0
        for t in self.list:
            my_hash =hash(t[0])
            my_hash =t[1]
        return my_hash
    
    def __eq__(self, other_list):
        self.list == other_list.list

Then, you can check that this works:

rdd1.mapValues(lambda x : my_list(x))\
    .subtract(rdd2.mapValues(lambda x: my_list(x)))\
    .collect()

NB: if you work in a shell, do not define the class within the shell or pickle won't be able to serialize your class. Define it in a separate file like my_list.py and import it with pyspark --py-files my_list.py and in the shell, import my_list from my_list.

  • Related