I have a table CUSTOMER_ORDERS with a field postal_code in it as below.
GEO CUST_POSTAL_CD UNITS
NA 35242 4
NA 97124 6
NA 77044 7
NA 10461 8
NA 60026 3
There is another table warehouse_loc as below
WH_ID WH_ZIP WH_TYPE
9740 89108 FULL
1562 15432 FULL
5096 80124 DOM
7543 97005 SEMI_INTL
6381 35758 DOM
No I need to find the closest warehouse (WH_ZIP) to every CUST_POSTAL_CD. It may be in a different state but, I need to find the closest WH_ZIP for every CUST_POSTAL_CD by distance. For example, to the CUST_POSTAL_CD = 97124, the closest WH_ZIP is 97005 so I need to programmatically find the closest WH_ZIP for every CUST_POSTAL_CD by distance between two zip codes. In this sample data I have only 5 rows for comparision but in my actual data I have over 40000 rows CUSTOMER_ORDERS table and over 300 in warehouse_loc table.
How can I achieve this using Pyspark?? Please help me. Thank you!
CodePudding user response:
Not familiar with Pyspark, but as tables can be loaded into dataframes, this might help.
import pandas as pd
import random
cust_table = pd.DataFrame({'GEO':[0]*8, 'CUST_POSTAL_CD':[random.randint(1000,5000) for x in range(8)], 'UNITS':[random.randint(1,10) for x in range(8)]})
warehouse_table = pd.DataFrame({'whzip':[random.randint(1000,5000) for x in range(8)], 'whunits':[random.randint(1,10) for x in range(8)]})
cust_table['closest_whzip'] = cust_table['CUST_POSTAL_CD'].apply(lambda x: warehouse_table.loc[warehouse_table['whzip'].sub(x).abs().argsort()]['whzip'].values[0])
print(cust_table)
CodePudding user response:
You can do it using a cross join to generate all possible pairs (CUST_POSTAL_CD, WH_ZIP), then use a library to calculate the distance between each 2 zip codes, then for same values of CUST_POSTAL_CD keep the minimum distance using spark window, here's the code:
# library used to calculate the distance between 2 zip codes
pip install pgeocode
# create spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").config("spark.driver.memory", "4G").getOrCreate()
# prepare the data
data1 = [
["""NA""", 35242, 4],
["""NA""", 97124, 6],
["""NA""", 77044, 7],
["""NA""", 10461, 8],
["""NA""", 60026, 3],
]
data2 = [
[9740, 89108, "FULL"],
[1562, 15432, "FULL"],
[5096, 80124, "DOM"],
[7543, 97005, "SEMI_INTL"],
[6381, 35758, "DOM"],
]
customer_orders_df = spark.createDataFrame(data1).toDF("GEO","CUST_POSTAL_CD","UNITS")
warehouse_loc_df = spark.createDataFrame(data2).toDF("WH_ID","WH_ZIP","WH_TYPE")
# Cross join to generate all pairs
joined_df = customer_orders_df.crossJoin(warehouse_loc_df)
Finally calculate all possible distances and keep only minimum distance for each CUST_POSTAL_CD:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col, udf
import pgeocode
dist = pgeocode.GeoDistance('us')
@udf('double')
def get_distance(x,y):
return float(dist.query_postal_code(x,y))
windowSpec = Window.partitionBy("CUST_POSTAL_CD").orderBy("distance")
joined_df.withColumn('distance', get_distance(joined_df.CUST_POSTAL_CD, joined_df.WH_ZIP)).sort("CUST_POSTAL_CD", "distance") \
.withColumn("row_number",row_number().over(windowSpec)).filter(col("row_number") == "1").drop("distance", "row_number").show(200)
--- -------------- ----- ----- ------ ---------
|GEO|CUST_POSTAL_CD|UNITS|WH_ID|WH_ZIP| WH_TYPE|
--- -------------- ----- ----- ------ ---------
| NA| 10461| 8| 1562| 15432| FULL|
| NA| 35242| 4| 6381| 35758| DOM|
| NA| 60026| 3| 1562| 15432| FULL|
| NA| 77044| 7| 6381| 35758| DOM|
| NA| 97124| 6| 7543| 97005|SEMI_INTL|
--- -------------- ----- ----- ------ ---------
As you can see for CUST_POSTAL_CD = 97124 it find's the closest WH_ZIP which is 97005 as you mentioned in the statement.