I am very new to Python and I guess I am doing something to cause a memory problem? I am downloading 2 years worth of OHLC for 10k symbols and writing it to a sql database. Problem is, when I try to pull the entire list it crashes (but doesn't crash if I download 20% of the list). Here's my script:
import config
from alpaca_trade_api.rest import REST, TimeFrame
import sqlite3
import pandas as pd
import datetime
from dateutil.relativedelta import relativedelta
start_date = (datetime.datetime.now() - relativedelta(years=2)).date()
start_date = pd.Timestamp(start_date, tz='America/New_York').isoformat()
end_date = pd.Timestamp(datetime.datetime.now(), tz='America/New_York').isoformat()
conn = sqlite3.connect('allStockData.db')
api = REST(config.api_key_id, config.api_secret, base_url=config.base_url)
origin_symbols = pd.read_sql_query("SELECT symbol, name from stock", conn)
df = origin_symbols
df_dict = df.to_dict('records')
startTime = datetime.datetime.now()
api = REST(config.api_key_id, config.api_secret, base_url=config.base_url)
temp_data = []
for key in df_dict:
symbol = key['symbol']
print(f"downloading ${symbol}")
# stock_id = key['id']
barsets = api.get_bars_iter(symbol, TimeFrame.Day, start_date, end_date)
barsets = list(barsets)
for index, bar in enumerate(barsets):
bars = pd.DataFrame({'date': bar.t.date(), 'symbol': symbol, 'open': bar.o, 'high': bar.h, 'low': bar.l, 'close': bar.c, 'volume': bar.v, 'vwap': bar.vw}, index=[0])
temp_data.append(bars)
print("loop complete")
data = pd.concat(temp_data)
# write df back to sql, replacing the previous table
data.to_sql('daily_ohlc_init', if_exists='replace', con=conn, index=True)
endTime = datetime.datetime.now()
print(f'time elapsed to pull data was {endTime - startTime}')
To make it work, I can add this line after df_dict
to limit the amount of symbols downloaded:
df_dict = df_dict[0:2000]
This will allow me to write to the database, but I need the entire dictionary (which is about 10k symbols).
Sorry for the noob question but I am lost here.
How do I write to the db without it crashing?
CodePudding user response:
Since you mentioned that you are able to make it work for 2000 records of df_dict
at a time, a possible simple approach could be:
api = REST(config.api_key_id, config.api_secret, base_url=config.base_url)
num_records = len(df_dict)
chunk_size = 2000
num_passes = min(num_records // chunk_size, 1)
for i in range(num_passes):
start = i * chunk_size
end = max((i 1) * chunk_size, num_records)
df_chunk = df_dict[start: end]
temp_data = []
for key in df_chunk:
symbol = key['symbol']
print(f"downloading ${symbol}")
barsets = api.get_bars_iter(symbol, TimeFrame.Day, start_date, end_date)
barsets = list(barsets)
for index, bar in enumerate(barsets):
bars = [bar.t.date(), symbol, bar.o, bar.h, bar.l, bar.c, bar.v, bar.vw]
temp_data.append(bars)
# should be a bit more efficient to create a dataframe just once
columns = ['date', 'symbol', 'open', 'high', 'low', 'close', 'volume', 'vwap']
data = pd.DataFrame(temp_data, columns=columns)
# should delete previous table when writing first chunk, then start appending from next passes through df_dict
data.to_sql('daily_ohlc_init', if_exists='replace' if i == 0 else 'append', con=conn, index=True)
print(f"Internal loop finished processing records {start} to {end} out of {num_records}.")
endTime = datetime.datetime.now()
print(f'time elapsed to pull data was {endTime - startTime}')