Home > Software design >  Dask - map_partition
Dask - map_partition

Time:11-11

I have a Dask DataFrame with sets of latitudes and longitudes (~32m records). I am trying to calculate the distance between the lat/lon using a function like below:

import numpy as np
from geopy import distance

def calc_distance(df, lat_col_name_1, lon_col_name_1, lat_col_name_2, lon_col_name_2):
if df[lat_col_name_1] != np.nan and df[lon_col_name_1] != np.nan and df[lat_col_name_2] != np.nan and df[lon_col_name_2] != np.nan:
    return distance.distance((df[lat_col_name_1], df[lon_col_name_1]), (df[lat_col_name_2], df[lon_col_name_2])).miles
else:
    return np.nan 

I have tried calling this function using map_partitions (to create a DataFrame of index and distance as well as calling map_paritions with assign. I would like to use assign so I can avoid joining the DataFrames back together (seems costly). It does not like the np.nan checks. I get a

ValueError: The truth value of a Series is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all().

I have records with null lat/lon so I need to account for that when calculating the distance.

Using map_partitions

distance = big_df.map_partitions(calc_distance, 
                                    lat_col_name_1='latitude_1', 
                                    lon_col_name_1='longitude_1', 
                                    lat_col_name_2='latitude_2', 
                                    lon_col_name_2='longitude_2', 
                                    meta={'distance': np.float64})

Using map_partitions and assign

def calc_distance_miles(lat1, lon1, lat2, lon2):
    if lat1 != np.nan and lon1 != np.nan and lat2 != np.nan and lon2 != np.nan:
        return distance.distance((lat1, lon1), (lat2, lon2)).miles
    else:
        return np.nan
    

big_df = big_df.map_partitions(lambda df: df.assign(
    distance=calc_distance_miles(df['latitude_1'], df['longitude_1'], df['latitude_2'], df['longitude_2'])
), meta={'distance': np.float64}
)

CodePudding user response:

map_partitions isn't like df.apply - the function calc_distance is being called with a partition of the dask.dataframe, which has type pd.DataFrame.

Therefore, df[lat_col_name_1] is a Series and df[lat_col_name_1] != np.nan is a boolean series (which will always return this error - see e.g. Truth value of a Series is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all()).

There are faster array ways of computing a distance than element-wise, but the dask.dataframe analogue to what you're trying to do is to use map_partitions and then apply:

def calc_distance(series, lat_col_name_1, lon_col_name_1, lat_col_name_2, lon_col_name_2):
    if series[
        [lat_col_name_1, lon_col_name_1, lat_col_name_2, lon_col_name_2]
    ].notnull().all():

        return distance.distance(
            (series[lat_col_name_1], series[lon_col_name_1]),
            (series[lat_col_name_2], series[lon_col_name_2]),
        ).miles

    else:
        return np.nan 

def calc_distance_df(df, **kwargs):
    return df.apply(calc_distance, axis=1, **kwargs)

distances = big_df.map_partitions(
    calc_distance_df,
    meta=np.float64,
    lat_col_name_1=lat_col_name_1,
    lon_col_name_1=lon_col_name_1,
    lat_col_name_2=lat_col_name_2,
    lon_col_name_2=lon_col_name_2,
)
  • Related