Home > Net >  How can I call an API 2 million times quickly and write the returned value into my dataframe?
How can I call an API 2 million times quickly and write the returned value into my dataframe?

Time:06-21

I have a problem. I have a dataframe that contains two addresses fromLat, fromLong to toLat, toLong. From these two addresses I want to calculate the distance by car and the duration. For this I use the following API from project-osrm . Unfortunately, the way I do it is very slow. I have about 2 million rows in my dataframe and for each one I would like to measure the duration and the distance. I have found the following code snippet (see below). However, I don't know how to integrate my dataframe so that it passes the values.

How can I call an API 2 million times quickly and write the returned value into my dataframe?

  • If I don't get a status code 200 back, I should just write None for distance and duration.

Dataframe

   customerId    fromLat     fromLong      toLat       toLong
0           1  48.103190    11.601773  53.446762    10.061031
1           2  48.541160     2.628249  41.374426     2.037211
2           2  25.907100  -100.338113  23.994722  -104.754147
3           3  41.948767    12.742488  49.316171   -33.968543
4           3       None         None  41.948767    12.742488

Code with MVC

import pandas as pd
d = {
    "customerId": [1, 2, 2, 3, 3],
    "fromLat": ["48.103190","48.541160", "25.907100", "41.948767", None],
     "fromLong": ["11.601773", "2.628249", "-100.338113", "12.742488", None],
     "toLat": ["53.446762", "41.374426", "23.994722", "49.316171", "41.948767"],
     "toLong": ["10.061031", "2.037211", "-104.754147", "-33.968543", "12.742488"],
}

df = pd.DataFrame(data=d)
print(df)

import requests
import json

def f(x):
  url = f"http://router.project-osrm.org/route/v1/driving/{x['fromLat']},{x['fromLong']};{x['toLat']},{x['toLong']}?overview=false"
  r = requests.get(url)

  print(r.status_code, r.reason)

  if (r.status_code == 200):
    data = json.loads(r.text)
    x['distance']  = data['routes'][0]['distance']
    x['duration'] = data['routes'][0]['duration']
  else:
    x['distance'] = None
    x['duration']= None
  return x

df = df.apply(lambda x: f(x), axis=1)

What I want (note: duration and distance are only example values)

   customerId    fromLat     fromLong      toLat       toLong  Distance Duration
0           1  48.103190    11.601773  53.446762    10.061031       500      785
1           2  48.541160     2.628249  41.374426     2.037211      4784      474
2           2  25.907100  -100.338113  23.994722  -104.754147       147      987
3           3  41.948767    12.742488  49.316171   -33.968543      None     None
4           3       None         None  41.948767    12.742488      None     None

Code found at Github

# modified fetch function with semaphore
import random
import asyncio
from aiohttp import ClientSession

async def fetch(url, session):
    async with session.get(url) as response:
        delay = response.headers.get("DELAY")
        date = response.headers.get("DATE")
        print("{}:{} with delay {}".format(date, response.url, delay))
        return await response.read()


async def bound_fetch(sem, url, session):
    # Getter function with semaphore.
    async with sem:
        await fetch(url, session)


async def run(r):
    url = f"http://router.project-osrm.org/route/v1/driving/{fromLat},{fromLong};{toLat},{toLong}?overview=false"
    tasks = []
    # create instance of Semaphore
    sem = asyncio.Semaphore(1000)

    # Create client session that will ensure we dont open new connection
    # per each request.
    async with ClientSession() as session:
        for i in range(r):
            # pass Semaphore and session to every GET request
            task = asyncio.ensure_future(bound_fetch(sem, url.format(i), session))
            tasks.append(task)

        responses = asyncio.gather(*tasks)
        await responses

number = df.shape[0]
loop = asyncio.get_event_loop()

future = asyncio.ensure_future(run(number))
loop.run_until_complete(future)

CodePudding user response:

When I do scraping / massive API calling I usually use the good old joblib threading. Here is a general code snippet that should work:

from joblib import Parallel, delayed
import pandas ad pd
import requests
import json

# the function you want to parallelize
def f(x):

    # request
    res = requests.get(
        (
            f"http://router.project-osrm.org/route/v1/driving/"
            f"{x['fromLat']},{x['fromLong']};{x['toLat']},{x['toLong']}"
            f"?overview=false"
        )
    )

    # parse output into a dict if valid
    if (res.status_code == 200):
        data = json.loads(res.text)
        output = {
            "customerId": x["customerId"],
            "distance"  : data['routes'][0]['distance'],
            "duration"  : data['routes'][0]['duration']
        }

    # output dict if request failed
    else:
        output = {
            "customerId": x["customerId"],
            "distance"  : float("nan"),
            "duration"  : float("nan")
        }
    
    return output

# run f(x) in parallel, where x is a row of your df
outputs = Parallel(n_jobs=16, backend="threading")(
    delayed(f)(x) for x in df.iterrows()
)

# create pandas df from the list of output dicts
outputs_df = pd.DataFrame(outputs)

Pay attention to the n_jobs arg of Parallel. It is responsible for the number of concurrent executions of f(x). If you are hitting some kind of API limit due to number of requests, you can add time.sleep(some_number_of_seconds) from time module anywhere in the f(x) code to add delay on each request.

Hope this was useful :)

CodePudding user response:

OSRM also offers a C library:

OSRM can be used as a library (libosrm) via C instead of using it through the HTTP interface and osrm-routed. This allows for fine-tuning OSRM and has much less overhead. Here is a quick introduction into how to use libosrm in the upcoming v5 release.

This is probably the way to go if you want to query many routes.

  • Related