Home > other >  Parallelize 20K requests filter & concat results into 1 dataframe
Parallelize 20K requests filter & concat results into 1 dataframe

Time:10-25

I need to make around 20K API calls, each one returns a CSV file and then on that file I have to perform some operations, finally, concatenate all the results into a single dataframe.

I've completed this task sequentially, but the issue is that each API Call lasts around 1sec and it takes around 6h to complete. So, I would like parallelize the task as I can make up to 100 simulatenous API calls and up to 1000 calls per minute.

I've tried several stuff, but I'm struggling... I have accomplished to parallelize the tasks and complete 200 API calls in about 8 seconds, but I can't concat all the results into a single dataframe... Would appreciate any help.

Thanks! :)

This is what i have:

from concurrent.futures import ThreadPoolExecutor
import time

MAX_THREADS = 15
tickers = tickers[:200]

df_gappers = pd.DataFrame([])

start_time = time.time()

def scrape(ticker):
    url = f"https://eodhistoricaldata.com/api/eod/{ticker}?from={start_date}&to={end_date}&period=d&api_token={key}"
    r = requests.get(url)
    if r.status_code == 200:
        df = pd.read_csv(io.StringIO(r.content.decode('utf-8')))   
        df['ticker'] = ticker
        df['gap'] = round((( df.Open - df.Close.shift(1) ) / df.Close.shift(1) ) * 100, 2)
        df = df[df.gap > 30]
        if not df.empty:
            #print(df)
            return df

    else:
        pass

with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
    results = executor.map(scrape, tickers)

#df = pd.concat(results)
    
print("--- %s seconds ---" % (time.time() - start_time))

CodePudding user response:

I have a few issues recreating the data:

start_time = time.time()  # invalid start time

df = df[df.gap > 30] # returns an empty df since all values for gap are below 30

Suggestion: You can set &fmt=json to have an easier time creating your df

df = pd.DataFrame(r.json())

If I change those things, your code works as expected. Please provide a list of tickers and a reproducible example. (I have an API key)

CodePudding user response:

Alternatively, you could use dask.dataframe, which gives an API much like that of pandas and handles parallelization over multiple threads, processes, or physical servers:

import dask
import dask.distributed
import dask.dataframe
import pandas as pd

# start a dask client
client = dask.distributed.Client()

tickers = tickers[:200]

# map your job over the tickers
futures = client.map(scrape, tickers)

# wait for the jobs to finish, then filter out the Null return values
dask.distributed.wait(futures)
non_null_futures = [f for f in futures if f.type != type(None)]

# convert the futures to a single dask.dataframe
ddf = dask.dataframe.from_delayed(non_null_futures)

# if desired, you can now convert to pandas
df = ddf.compute()

# alternatively, you could write straight to disk, giving
# a partitioned binary file, e.g.
ddf.to_parquet("myfile.parquet")
  • Related