I have a Dask DataFrame with sets of latitudes and longitudes (~32m records). I am trying to calculate the distance between the lat/lon using a function like below:
import numpy as np
from geopy import distance
def calc_distance(df, lat_col_name_1, lon_col_name_1, lat_col_name_2, lon_col_name_2):
if df[lat_col_name_1] != np.nan and df[lon_col_name_1] != np.nan and df[lat_col_name_2] != np.nan and df[lon_col_name_2] != np.nan:
return distance.distance((df[lat_col_name_1], df[lon_col_name_1]), (df[lat_col_name_2], df[lon_col_name_2])).miles
else:
return np.nan
I have tried calling this function using map_partitions (to create a DataFrame of index and distance as well as calling map_paritions with assign. I would like to use assign so I can avoid joining the DataFrames back together (seems costly). It does not like the np.nan checks. I get a
ValueError: The truth value of a Series is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all().
I have records with null lat/lon so I need to account for that when calculating the distance.
Using map_partitions
distance = big_df.map_partitions(calc_distance,
lat_col_name_1='latitude_1',
lon_col_name_1='longitude_1',
lat_col_name_2='latitude_2',
lon_col_name_2='longitude_2',
meta={'distance': np.float64})
Using map_partitions and assign
def calc_distance_miles(lat1, lon1, lat2, lon2):
if lat1 != np.nan and lon1 != np.nan and lat2 != np.nan and lon2 != np.nan:
return distance.distance((lat1, lon1), (lat2, lon2)).miles
else:
return np.nan
big_df = big_df.map_partitions(lambda df: df.assign(
distance=calc_distance_miles(df['latitude_1'], df['longitude_1'], df['latitude_2'], df['longitude_2'])
), meta={'distance': np.float64}
)
CodePudding user response:
map_partitions
isn't like df.apply - the function calc_distance
is being called with a partition of the dask.dataframe, which has type pd.DataFrame.
Therefore, df[lat_col_name_1]
is a Series and df[lat_col_name_1] != np.nan
is a boolean series (which will always return this error - see e.g. Truth value of a Series is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all()).
There are faster array ways of computing a distance than element-wise, but the dask.dataframe analogue to what you're trying to do is to use map_partitions and then apply:
def calc_distance(series, lat_col_name_1, lon_col_name_1, lat_col_name_2, lon_col_name_2):
if series[
[lat_col_name_1, lon_col_name_1, lat_col_name_2, lon_col_name_2]
].notnull().all():
return distance.distance(
(series[lat_col_name_1], series[lon_col_name_1]),
(series[lat_col_name_2], series[lon_col_name_2]),
).miles
else:
return np.nan
def calc_distance_df(df, **kwargs):
return df.apply(calc_distance, axis=1, **kwargs)
distances = big_df.map_partitions(
calc_distance_df,
meta=np.float64,
lat_col_name_1=lat_col_name_1,
lon_col_name_1=lon_col_name_1,
lat_col_name_2=lat_col_name_2,
lon_col_name_2=lon_col_name_2,
)