Home > Software engineering >  Finding out the nearest zip code for each row in a table
Finding out the nearest zip code for each row in a table

Time:12-06

I have a table CUSTOMER_ORDERS with a field postal_code in it as below.

GEO     CUST_POSTAL_CD   UNITS
NA      35242             4
NA      97124             6
NA      77044             7
NA      10461             8
NA      60026             3

There is another table warehouse_loc as below

WH_ID     WH_ZIP    WH_TYPE   
9740      89108     FULL
1562      15432     FULL
5096      80124     DOM
7543      97005     SEMI_INTL
6381      35758     DOM

No I need to find the closest warehouse (WH_ZIP) to every CUST_POSTAL_CD. It may be in a different state but, I need to find the closest WH_ZIP for every CUST_POSTAL_CD by distance. For example, to the CUST_POSTAL_CD = 97124, the closest WH_ZIP is 97005 so I need to programmatically find the closest WH_ZIP for every CUST_POSTAL_CD by distance between two zip codes. In this sample data I have only 5 rows for comparision but in my actual data I have over 40000 rows CUSTOMER_ORDERS table and over 300 in warehouse_loc table.

How can I achieve this using Pyspark?? Please help me. Thank you!

CodePudding user response:

Not familiar with Pyspark, but as tables can be loaded into dataframes, this might help.

import pandas as pd
import random

cust_table = pd.DataFrame({'GEO':[0]*8, 'CUST_POSTAL_CD':[random.randint(1000,5000) for x in range(8)], 'UNITS':[random.randint(1,10) for x in range(8)]})
warehouse_table = pd.DataFrame({'whzip':[random.randint(1000,5000) for x in range(8)], 'whunits':[random.randint(1,10) for x in range(8)]})

cust_table['closest_whzip'] = cust_table['CUST_POSTAL_CD'].apply(lambda x: warehouse_table.loc[warehouse_table['whzip'].sub(x).abs().argsort()]['whzip'].values[0])

print(cust_table)

CodePudding user response:

You can do it using a cross join to generate all possible pairs (CUST_POSTAL_CD, WH_ZIP), then use a library to calculate the distance between each 2 zip codes, then for same values of CUST_POSTAL_CD keep the minimum distance using spark window, here's the code:

# library used to calculate the distance between 2 zip codes
pip install pgeocode

# create spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").config("spark.driver.memory", "4G").getOrCreate()

# prepare the data
data1 = [
        ["""NA""", 35242, 4],
        ["""NA""", 97124, 6],
        ["""NA""", 77044, 7],
        ["""NA""", 10461, 8],
        ["""NA""", 60026, 3],
    ]

data2 = [
        [9740, 89108, "FULL"],
        [1562, 15432, "FULL"],
        [5096, 80124, "DOM"],
        [7543, 97005, "SEMI_INTL"],
        [6381, 35758, "DOM"],
    ]

customer_orders_df = spark.createDataFrame(data1).toDF("GEO","CUST_POSTAL_CD","UNITS")
warehouse_loc_df = spark.createDataFrame(data2).toDF("WH_ID","WH_ZIP","WH_TYPE")

# Cross join to generate all pairs
joined_df = customer_orders_df.crossJoin(warehouse_loc_df)

Finally calculate all possible distances and keep only minimum distance for each CUST_POSTAL_CD:

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col, udf
import pgeocode

dist = pgeocode.GeoDistance('us')

@udf('double')
def get_distance(x,y):
    return float(dist.query_postal_code(x,y))

windowSpec = Window.partitionBy("CUST_POSTAL_CD").orderBy("distance")

joined_df.withColumn('distance', get_distance(joined_df.CUST_POSTAL_CD, joined_df.WH_ZIP)).sort("CUST_POSTAL_CD", "distance") \
  .withColumn("row_number",row_number().over(windowSpec)).filter(col("row_number") == "1").drop("distance", "row_number").show(200)

 --- -------------- ----- ----- ------ --------- 
|GEO|CUST_POSTAL_CD|UNITS|WH_ID|WH_ZIP|  WH_TYPE|
 --- -------------- ----- ----- ------ --------- 
| NA|         10461|    8| 1562| 15432|     FULL|
| NA|         35242|    4| 6381| 35758|      DOM|
| NA|         60026|    3| 1562| 15432|     FULL|
| NA|         77044|    7| 6381| 35758|      DOM|
| NA|         97124|    6| 7543| 97005|SEMI_INTL|
 --- -------------- ----- ----- ------ --------- 

As you can see for CUST_POSTAL_CD = 97124 it find's the closest WH_ZIP which is 97005 as you mentioned in the statement.

  • Related