I have two dictionaries. One where UserID
is the key and their location is the value.
The first items look like this:
{'U1001': ('22.139997', '-100.978803'),
'U1002': ('22.150087', '-100.983325')}
And another dictionary where PlaceID
is the key and the location is the value.
The first items look like this:
{'134999': ('18.915421', '-99.184871'),
'132825': ('22.1473922', '-100.983092')}
Now I got an RDD where UserID
, PlaceID
, and a rating from the User on the Place is given:
[('U1077', '135085', 2),
('U1077', '135038', 2)]
I would like to calculate the distance between the User and Place and keep the rating using geodesic
from geopy.distance
I could (convert and) join the (values of the) dictionaries and replace them with UserID
and PlaceID
but I'm looking for a solution that uses the pyspark language.
I came around .mapValues
but that doesn't quite do the trick for me.
So, eventually, I would like to get the distance and the rating that was given:
[('2', 693.4067254748844),
('2', 806.8757681276663)]
CodePudding user response:
You can create RDDs from the users_dict
and places_dict
then join with ratings_rdd
to get the coordinates of the user and the rated place. Then using map, call geodesic
to calculate the distance.
Here's an example:
from geopy.distance import geodesic
users_dict = {'U1077': ('22.139997', '-100.978803'), 'U1002': ('22.150087', '-100.983325')}
places_dict = {'135085': ('18.915421', '-99.184871'), '135038': ('22.1473922', '-100.983092')}
users_rdd = sc.parallelize(list(users_dict.items()))
places_rdd = sc.parallelize(list(places_dict.items()))
ratings_rdd = sc.parallelize([('U1077', '135085', 2), ('U1077', '135038', 2)])
# RDD(UserId, (PlaceId, Rating))
ratings_rdd = ratings_rdd.map(lambda x: (x[0], list(x[1:])))
# RDD(PlaceId, (UserId, UserCoordinates, Rating))
joined1 = ratings_rdd.join(users_rdd).map(lambda x: (x[1][0][0], (x[0], x[1][1], x[1][0][1])))
# RDD(UserId, PlaceId, Rating, Distance)
result = joined1.join(places_rdd).map(
lambda x: (x[1][0][0], x[0], x[1][0][2], geodesic(x[1][0][1], x[1][1]).kilometers)
)
print(result.collect())
#[('U1077', '135085', 2, 403.0361166435645), ('U1077', '135038', 2, 0.9307697045815713)]
You can print the intermediate RDDs to understand the logic. In short, wee need the rdds to be keyed by UserId
to join with users_rdd
then to be keyed by PlaceID
to join with places_rdd