I need to make around 20K API calls, each one returns a CSV file and then on that file I have to perform some operations, finally, concatenate all the results into a single dataframe.
I've completed this task sequentially, but the issue is that each API Call lasts around 1sec and it takes around 6h to complete. So, I would like parallelize the task as I can make up to 100 simulatenous API calls and up to 1000 calls per minute.
I've tried several stuff, but I'm struggling... I have accomplished to parallelize the tasks and complete 200 API calls in about 8 seconds, but I can't concat all the results into a single dataframe... Would appreciate any help.
Thanks! :)
This is what i have:
from concurrent.futures import ThreadPoolExecutor
import time
MAX_THREADS = 15
tickers = tickers[:200]
df_gappers = pd.DataFrame([])
start_time = time.time()
def scrape(ticker):
url = f"https://eodhistoricaldata.com/api/eod/{ticker}?from={start_date}&to={end_date}&period=d&api_token={key}"
r = requests.get(url)
if r.status_code == 200:
df = pd.read_csv(io.StringIO(r.content.decode('utf-8')))
df['ticker'] = ticker
df['gap'] = round((( df.Open - df.Close.shift(1) ) / df.Close.shift(1) ) * 100, 2)
df = df[df.gap > 30]
if not df.empty:
#print(df)
return df
else:
pass
with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
results = executor.map(scrape, tickers)
#df = pd.concat(results)
print("--- %s seconds ---" % (time.time() - start_time))
CodePudding user response:
I have a few issues recreating the data:
start_time = time.time() # invalid start time
df = df[df.gap > 30] # returns an empty df since all values for gap are below 30
Suggestion:
You can set &fmt=json
to have an easier time creating your df
df = pd.DataFrame(r.json())
If I change those things, your code works as expected. Please provide a list of tickers and a reproducible example. (I have an API key)
CodePudding user response:
Alternatively, you could use dask.dataframe, which gives an API much like that of pandas and handles parallelization over multiple threads, processes, or physical servers:
import dask
import dask.distributed
import dask.dataframe
import pandas as pd
# start a dask client
client = dask.distributed.Client()
tickers = tickers[:200]
# map your job over the tickers
futures = client.map(scrape, tickers)
# wait for the jobs to finish, then filter out the Null return values
dask.distributed.wait(futures)
non_null_futures = [f for f in futures if f.type != type(None)]
# convert the futures to a single dask.dataframe
ddf = dask.dataframe.from_delayed(non_null_futures)
# if desired, you can now convert to pandas
df = ddf.compute()
# alternatively, you could write straight to disk, giving
# a partitioned binary file, e.g.
ddf.to_parquet("myfile.parquet")