Home > Software design >  Pyspark - Map and apply calculation using value from dictionaries
Pyspark - Map and apply calculation using value from dictionaries

Time:12-21

I have two dictionaries. One where UserID is the key and their location is the value. The first items look like this:

{'U1001': ('22.139997', '-100.978803'),
 'U1002': ('22.150087', '-100.983325')}

And another dictionary where PlaceID is the key and the location is the value. The first items look like this:

{'134999': ('18.915421', '-99.184871'),
 '132825': ('22.1473922', '-100.983092')}

Now I got an RDD where UserID, PlaceID, and a rating from the User on the Place is given:

[('U1077', '135085', 2),
 ('U1077', '135038', 2)]

I would like to calculate the distance between the User and Place and keep the rating using geodesic from geopy.distance

I could (convert and) join the (values of the) dictionaries and replace them with UserID and PlaceID but I'm looking for a solution that uses the pyspark language.

I came around .mapValues but that doesn't quite do the trick for me.

So, eventually, I would like to get the distance and the rating that was given:

[('2', 693.4067254748844),
 ('2', 806.8757681276663)]

CodePudding user response:

You can create RDDs from the users_dict and places_dict then join with ratings_rdd to get the coordinates of the user and the rated place. Then using map, call geodesic to calculate the distance.

Here's an example:

from geopy.distance import geodesic

users_dict = {'U1077': ('22.139997', '-100.978803'), 'U1002': ('22.150087', '-100.983325')}
places_dict = {'135085': ('18.915421', '-99.184871'), '135038': ('22.1473922', '-100.983092')}

users_rdd = sc.parallelize(list(users_dict.items()))
places_rdd = sc.parallelize(list(places_dict.items()))
ratings_rdd = sc.parallelize([('U1077', '135085', 2), ('U1077', '135038', 2)])

# RDD(UserId, (PlaceId, Rating))
ratings_rdd = ratings_rdd.map(lambda x: (x[0], list(x[1:])))

# RDD(PlaceId, (UserId, UserCoordinates, Rating)) 
joined1 = ratings_rdd.join(users_rdd).map(lambda x: (x[1][0][0], (x[0], x[1][1], x[1][0][1]))) 

# RDD(UserId, PlaceId, Rating, Distance)
result = joined1.join(places_rdd).map(
    lambda x: (x[1][0][0], x[0], x[1][0][2], geodesic(x[1][0][1], x[1][1]).kilometers)
)

print(result.collect())
#[('U1077', '135085', 2, 403.0361166435645), ('U1077', '135038', 2, 0.9307697045815713)]

You can print the intermediate RDDs to understand the logic. In short, wee need the rdds to be keyed by UserId to join with users_rdd then to be keyed by PlaceID to join with places_rdd

  • Related