Home > Enterprise >  Unable to process large amount of data using for loop
Unable to process large amount of data using for loop

Time:11-19

I am downloading 2 years worth of OHLC for 10k symbols and writing it to database. When I try to pull the entire list it crashes (but doesn't if I download 20%):

import config 
from alpaca_trade_api.rest import REST, TimeFrame
import sqlite3
import pandas as pd
import datetime
from dateutil.relativedelta import relativedelta

start_date = (datetime.datetime.now() - relativedelta(years=2)).date()
start_date = pd.Timestamp(start_date, tz='America/New_York').isoformat()
end_date = pd.Timestamp(datetime.datetime.now(), tz='America/New_York').isoformat()
conn = sqlite3.connect('allStockData.db') 
api = REST(config.api_key_id, config.api_secret, base_url=config.base_url)
origin_symbols = pd.read_sql_query("SELECT  symbol, name from stock", conn)
df = origin_symbols
df_dict = df.to_dict('records')
startTime = datetime.datetime.now()
api = REST(config.api_key_id, config.api_secret, base_url=config.base_url)
temp_data = []

for key in df_dict:
    symbol = key['symbol']
    print(f"downloading ${symbol}")
    # stock_id =  key['id']

    barsets = api.get_bars_iter(symbol, TimeFrame.Day, start_date, end_date)
    barsets = list(barsets)

    for index, bar in enumerate(barsets):
            bars =  pd.DataFrame({'date': bar.t.date(), 'symbol': symbol, 'open': bar.o, 'high': bar.h, 'low': bar.l, 'close': bar.c, 'volume': bar.v, 'vwap': bar.vw}, index=[0])
            temp_data.append(bars)

print("loop complete")
data = pd.concat(temp_data)

# write df back to sql, replacing the previous table
data.to_sql('daily_ohlc_init', if_exists='replace', con=conn, index=True)

endTime = datetime.datetime.now()

print(f'time elapsed to pull data was {endTime - startTime}')

To make it work I add this line after df_dict to limit symbols downloaded:

df_dict = df_dict[0:2000]

This will allow me to write to database but I need the entire dictionary (about 10k symbols). How do I write to the database without it crashing?

CodePudding user response:

Since you mentioned that you are able to make it work for 2000 records of df_dict at a time, a possible simple approach could be:

api = REST(config.api_key_id, config.api_secret, base_url=config.base_url)

num_records = len(df_dict)
chunk_size = 2000
num_passes = num_records // chunk_size   int(num_records % chunk_size != 0)

for i in range(num_passes):
    start = i * chunk_size
    end = min((i   1) * chunk_size, num_records)
    df_chunk = df_dict[start: end]

    temp_data = []
    for key in df_chunk:
        symbol = key['symbol']
        print(f"downloading ${symbol}")
        barsets = api.get_bars_iter(symbol, TimeFrame.Day, start_date, end_date)
        barsets = list(barsets)

        for index, bar in enumerate(barsets):
            bars =  [bar.t.date(), symbol, bar.o, bar.h, bar.l, bar.c, bar.v, bar.vw]
            temp_data.append(bars)
    
    # should be a bit more efficient to create a dataframe just once
    columns = ['date', 'symbol', 'open', 'high', 'low', 'close', 'volume', 'vwap']
    data = pd.DataFrame(temp_data, columns=columns)

    # should delete previous table when writing first chunk, then start appending from next passes through df_dict
    data.to_sql('daily_ohlc_init', if_exists='replace' if i == 0 else 'append', con=conn, index=True)

    print(f"Internal loop finished processing records {start} to {end} out of {num_records}.")

endTime = datetime.datetime.now()
print(f'time elapsed to pull data was {endTime - startTime}')
  • Related