I have a problem. I have a dataframe that contains two addresses fromLat
, fromLong
to toLat
, toLong
. From these two addresses I want to calculate the distance by car and the duration. For this I use the following API from project-osrm . Unfortunately, the way I do it is very slow. I have about 2 million rows in my dataframe and for each one I would like to measure the duration
and the distance
. I have found the following code snippet (see below). However, I don't know how to integrate my dataframe so that it passes the values.
How can I call an API 2 million times quickly and write the returned value into my dataframe?
- If I don't get a status code
200
back, I should just write None fordistance
andduration
.
Dataframe
customerId fromLat fromLong toLat toLong
0 1 48.103190 11.601773 53.446762 10.061031
1 2 48.541160 2.628249 41.374426 2.037211
2 2 25.907100 -100.338113 23.994722 -104.754147
3 3 41.948767 12.742488 49.316171 -33.968543
4 3 None None 41.948767 12.742488
Code with MVC
import pandas as pd
d = {
"customerId": [1, 2, 2, 3, 3],
"fromLat": ["48.103190","48.541160", "25.907100", "41.948767", None],
"fromLong": ["11.601773", "2.628249", "-100.338113", "12.742488", None],
"toLat": ["53.446762", "41.374426", "23.994722", "49.316171", "41.948767"],
"toLong": ["10.061031", "2.037211", "-104.754147", "-33.968543", "12.742488"],
}
df = pd.DataFrame(data=d)
print(df)
import requests
import json
def f(x):
url = f"http://router.project-osrm.org/route/v1/driving/{x['fromLat']},{x['fromLong']};{x['toLat']},{x['toLong']}?overview=false"
r = requests.get(url)
print(r.status_code, r.reason)
if (r.status_code == 200):
data = json.loads(r.text)
x['distance'] = data['routes'][0]['distance']
x['duration'] = data['routes'][0]['duration']
else:
x['distance'] = None
x['duration']= None
return x
df = df.apply(lambda x: f(x), axis=1)
What I want (note: duration
and distance
are only example values)
customerId fromLat fromLong toLat toLong Distance Duration
0 1 48.103190 11.601773 53.446762 10.061031 500 785
1 2 48.541160 2.628249 41.374426 2.037211 4784 474
2 2 25.907100 -100.338113 23.994722 -104.754147 147 987
3 3 41.948767 12.742488 49.316171 -33.968543 None None
4 3 None None 41.948767 12.742488 None None
Code found at Github
# modified fetch function with semaphore
import random
import asyncio
from aiohttp import ClientSession
async def fetch(url, session):
async with session.get(url) as response:
delay = response.headers.get("DELAY")
date = response.headers.get("DATE")
print("{}:{} with delay {}".format(date, response.url, delay))
return await response.read()
async def bound_fetch(sem, url, session):
# Getter function with semaphore.
async with sem:
await fetch(url, session)
async def run(r):
url = f"http://router.project-osrm.org/route/v1/driving/{fromLat},{fromLong};{toLat},{toLong}?overview=false"
tasks = []
# create instance of Semaphore
sem = asyncio.Semaphore(1000)
# Create client session that will ensure we dont open new connection
# per each request.
async with ClientSession() as session:
for i in range(r):
# pass Semaphore and session to every GET request
task = asyncio.ensure_future(bound_fetch(sem, url.format(i), session))
tasks.append(task)
responses = asyncio.gather(*tasks)
await responses
number = df.shape[0]
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(run(number))
loop.run_until_complete(future)
CodePudding user response:
When I do scraping / massive API calling I usually use the good old joblib
threading. Here is a general code snippet that should work:
from joblib import Parallel, delayed
import pandas ad pd
import requests
import json
# the function you want to parallelize
def f(x):
# request
res = requests.get(
(
f"http://router.project-osrm.org/route/v1/driving/"
f"{x['fromLat']},{x['fromLong']};{x['toLat']},{x['toLong']}"
f"?overview=false"
)
)
# parse output into a dict if valid
if (res.status_code == 200):
data = json.loads(res.text)
output = {
"customerId": x["customerId"],
"distance" : data['routes'][0]['distance'],
"duration" : data['routes'][0]['duration']
}
# output dict if request failed
else:
output = {
"customerId": x["customerId"],
"distance" : float("nan"),
"duration" : float("nan")
}
return output
# run f(x) in parallel, where x is a row of your df
outputs = Parallel(n_jobs=16, backend="threading")(
delayed(f)(x) for x in df.iterrows()
)
# create pandas df from the list of output dicts
outputs_df = pd.DataFrame(outputs)
Pay attention to the n_jobs
arg of Parallel
. It is responsible for the number of concurrent executions of f(x)
. If you are hitting some kind of API limit due to number of requests, you can add time.sleep(some_number_of_seconds)
from time
module anywhere in the f(x)
code to add delay on each request.
Hope this was useful :)
CodePudding user response:
OSRM also offers a C library:
OSRM can be used as a library (libosrm) via C instead of using it through the HTTP interface and osrm-routed. This allows for fine-tuning OSRM and has much less overhead. Here is a quick introduction into how to use libosrm in the upcoming v5 release.
This is probably the way to go if you want to query many routes.