Finding out the nearest zip code for each row in a table


I have a table CUSTOMER_ORDERS with a field postal_code in it as below.

NA      35242             4
NA      97124             6
NA      77044             7
NA      10461             8
NA      60026             3

There is another table warehouse_loc as below

WH_ID     WH_ZIP    WH_TYPE   
9740      89108     FULL
1562      15432     FULL
5096      80124     DOM
7543      97005     SEMI_INTL
6381      35758     DOM

No I need to find the closest warehouse (WH_ZIP) to every CUST_POSTAL_CD. It may be in a different state but, I need to find the closest WH_ZIP for every CUST_POSTAL_CD by distance. For example, to the CUST_POSTAL_CD = 97124, the closest WH_ZIP is 97005 so I need to programmatically find the closest WH_ZIP for every CUST_POSTAL_CD by distance between two zip codes. In this sample data I have only 5 rows for comparision but in my actual data I have over 40000 rows CUSTOMER_ORDERS table and over 300 in warehouse_loc table.

How can I achieve this using Pyspark?? Please help me. Thank you!

Not familiar with Pyspark, but as tables can be loaded into dataframes, this might help.

import pandas as pd
import random

cust_table = pd.DataFrame({'GEO':[0]*8, 'CUST_POSTAL_CD':[random.randint(1000,5000) for x in range(8)], 'UNITS':[random.randint(1,10) for x in range(8)]})
warehouse_table = pd.DataFrame({'whzip':[random.randint(1000,5000) for x in range(8)], 'whunits':[random.randint(1,10) for x in range(8)]})

cust_table['closest_whzip'] = cust_table['CUST_POSTAL_CD'].apply(lambda x: warehouse_table.loc[warehouse_table['whzip'].sub(x).abs().argsort()]['whzip'].values[0])


You can do it using a cross join to generate all possible pairs (CUST_POSTAL_CD, WH_ZIP), then use a library to calculate the distance between each 2 zip codes, then for same values of CUST_POSTAL_CD keep the minimum distance using spark window, here's the code:

# library used to calculate the distance between 2 zip codes
pip install pgeocode

# create spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").config("spark.driver.memory", "4G").getOrCreate()

# prepare the data
data1 = [
        ["""NA""", 35242, 4],
        ["""NA""", 97124, 6],
        ["""NA""", 77044, 7],
        ["""NA""", 10461, 8],
        ["""NA""", 60026, 3],

data2 = [
        [9740, 89108, "FULL"],
        [1562, 15432, "FULL"],
        [5096, 80124, "DOM"],
        [7543, 97005, "SEMI_INTL"],
        [6381, 35758, "DOM"],

customer_orders_df = spark.createDataFrame(data1).toDF("GEO","CUST_POSTAL_CD","UNITS")
warehouse_loc_df = spark.createDataFrame(data2).toDF("WH_ID","WH_ZIP","WH_TYPE")

# Cross join to generate all pairs
joined_df = customer_orders_df.crossJoin(warehouse_loc_df)

Finally calculate all possible distances and keep only minimum distance for each CUST_POSTAL_CD:

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col, udf
import pgeocode

dist = pgeocode.GeoDistance('us')

def get_distance(x,y):
    return float(dist.query_postal_code(x,y))

windowSpec = Window.partitionBy("CUST_POSTAL_CD").orderBy("distance")

joined_df.withColumn('distance', get_distance(joined_df.CUST_POSTAL_CD, joined_df.WH_ZIP)).sort("CUST_POSTAL_CD", "distance") \
  .withColumn("row_number",row_number().over(windowSpec)).filter(col("row_number") == "1").drop("distance", "row_number").show(200)

 --- -------------- ----- ----- ------ --------- 
 --- -------------- ----- ----- ------ --------- 
| NA|         10461|    8| 1562| 15432|     FULL|
| NA|         35242|    4| 6381| 35758|      DOM|
| NA|         60026|    3| 1562| 15432|     FULL|
| NA|         77044|    7| 6381| 35758|      DOM|
| NA|         97124|    6| 7543| 97005|SEMI_INTL|
 --- -------------- ----- ----- ------ --------- 

As you can see for CUST_POSTAL_CD = 97124 it find's the closest WH_ZIP which is 97005 as you mentioned in the statement.

