I have a for loop in which it gets the historical data for each key (historical_data function). Now I have around 200 cryptocurrencies, so it will take a long time to get the historical data one by one. Now I'm wondering if I could get the historical data for each symbol in a seperate thread.
So in the code it runs through each symbol and each interval. Then I give the columns a name and drop some of the unusable columns. After each loop the data gets appended to the list. This takes a really long time if they have to run one by one, that's why I'd like to try and run this in threads but I don't know exactly how.
Thanks in advance!
class crypto:
symbols = []
with open('allsymbols', 'r ') as f:
for line in f:
symbol = line.strip('\n') 'BTC'
symbols.append(symbol)
intervals = ['1m','5m']
@staticmethod
def historical_data():
historical_list = []
for i in range(len(crypto.symbols)):
for j in range(len(crypto.intervals)):
historical_data = client.get_historical_klines(crypto.symbols[i], crypto.intervals[j], '11/19/2021', limit=1000)[-22:-1]
historical_df = pd.DataFrame(historical_data)
historical_df.columns = ['time', '1', 'high', 'low', 'close', '5', '6', '7', '8', '9', '10', '11']
historical_df.drop(columns=['1', '5', '6', '7', '8', '9', '10', '11'], axis=1, inplace=True)
historical_df['interval'] = crypto.intervals[j]
historical_df['symbol'] = crypto.symbols[i]
historical_df[['high', 'low', 'close']] = historical_df[['high', 'low', 'close']].apply(pd.to_numeric, axis=1)
historical_df['time'] = pd.to_datetime(historical_df['time'] / 1000, unit='s')
historical_list.append(historical_df.to_dict())
return historical_list
Note: I convert the lists into dictionaries and later on convert the values of the dictionary to a list again. This may slow down the process a lot? Maybe there's an easier way to do this but I don't know how. This is not the main focus tho.
@staticmethod
def refactor_list(historical_list):
historical_list_refactored = []
for i in range(len(historical_list)):
single_key_data = historical_list[i]
single_key_data['high'] = list(single_key_data['high'].values())
single_key_data['low'] = list(single_key_data['low'].values())
single_key_data['close'] = list(single_key_data['close'].values())
single_key_data['interval'] = list(single_key_data['interval'].values())
single_key_data['symbol'] = list(single_key_data['symbol'].values())
single_key_data['time'] = list(single_key_data['time'].values())
historical_list_refactored.append(single_key_data)
return historical_list_refactored
Symbols list:
['1INCHBTC', 'AAVEBTC', 'ACMBTC', 'ADABTC', 'ADXBTC', 'AERGOBTC', 'AGIXBTC', 'AGLDBTC', 'AIONBTC', 'AKROBTC', 'ALGOBTC', 'ALICEBTC', 'ALPACABTC', 'ALPHABTC', 'AMBBTC', 'ANKRBTC', 'ANTBTC', 'APPCBTC', 'ARDRBTC', 'ARPABTC', 'ARBTC', 'ARKBTC', 'ASRBTC', 'ASTBTC', 'ATABTC', 'ATMBTC', 'ATOMBTC', 'AUCTIONBTC', 'AUDIOBTC', 'AUTOBTC', 'AVABTC', 'AVAXBTC', 'AXSBTC', 'BADGERBTC', 'BAKEBTC', 'BALBTC', 'BANDBTC', 'BARBTC', 'BATBTC', 'BCDBTC', 'BEAMBTC', 'BELBTC', 'BETABTC', 'BLZBTC', 'BNBBTC', 'BNTBTC', 'BNXBTC', 'BONDBTC', 'BRDBTC', 'BTCSTBTC', 'BTGBTC', 'BTSBTC', 'BZRXBTC', 'C98BTC', 'CAKEBTC', 'CELOBTC', 'CELRBTC', 'CFXBTC', 'CHESSBTC', 'CHRBTC', 'CHZBTC', 'CITYBTC', 'CKBBTC', 'CLVBTC', 'CNDBTC', 'COMPBTC', 'COSBTC', 'COTIBTC', 'CRVBTC', 'CTKBTC', 'CTSIBTC', 'CTXCBTC', 'CVCBTC', 'DARBTC', 'DASHBTC', 'DATABTC', 'DCRBTC', 'DEGOBTC', 'DGBBTC', 'DIABTC', 'DNTBTC', 'DOCKBTC', 'DODOBTC', 'DOGEBTC', 'DOTBTC', 'DREPBTC', 'DUSKBTC', 'DYDXBTC', 'EGLDBTC', 'ELFBTC', 'ENJBTC', 'ENSBTC', 'EOSBTC', 'EPSBTC', 'ETCBTC', 'ETHBTC', 'EVXBTC', 'EZBTC', 'FARMBTC', 'FETBTC', 'FIDABTC', 'FILBTC', 'FIOBTC', 'FIROBTC', 'FISBTC', 'FLMBTC', 'FLOWBTC', 'FORBTC', 'FORTHBTC', 'FRONTBTC', 'FTMBTC', 'FTTBTC', 'FUNBTC', 'FXSBTC', 'GALABTC', 'GASBTC', 'GLMBTC', 'GNOBTC', 'GOBTC', 'GRSBTC', 'GRTBTC', 'GTCBTC', 'GTOBTC', 'GXSBTC', 'HARDBTC', 'HBARBTC', 'HIVEBTC', 'HNTBTC', 'ICPBTC', 'ICXBTC', 'IDEXBTC', 'ILVBTC', 'INJBTC', 'IOSTBTC', 'IOTABTC', 'IOTXBTC', 'IRISBTC', 'JASMYBTC', 'JSTBTC', 'JUVBTC', 'KAVABTC', 'KEEPBTC', 'KLAYBTC', 'KMDBTC', 'KNCBTC', 'KSMBTC', 'LAZIOBTC', 'LINABTC', 'LINKBTC', 'LITBTC', 'LOOMBTC', 'LPTBTC', 'LRCBTC', 'LSKBTC', 'LTCBTC', 'LTOBTC', 'LUNABTC', 'MANABTC', 'MATICBTC', 'MBOXBTC', 'MDABTC', 'MDTBTC', 'MDXBTC', 'MINABTC', 'MIRBTC', 'MITHBTC', 'MKRBTC', 'MLNBTC', 'MOVRBTC', 'MTHBTC', 'MTLBTC', 'NANOBTC', 'NASBTC', 'NAVBTC', 'NEARBTC', 'NEBLBTC', 'NEOBTC', 'NKNBTC', 'NMRBTC', 'NUBTC', 'NULSBTC', 'NXSBTC', 'OAXBTC', 'OCEANBTC', 'OGBTC', 'OGNBTC', 'OMBTC', 'OMGBTC', 'ONEBTC', 'ONGBTC', 'ONTBTC', 'ORNBTC', 'OXTBTC', 'PAXGBTC', 'PERLBTC', 'PERPBTC', 'PHABTC', 'PHBBTC', 'PIVXBTC', 'PNTBTC', 'POLSBTC', 'POLYBTC', 'PONDBTC', 'PORTOBTC', 'POWRBTC', 'PROMBTC', 'PSGBTC', 'QIBTC', 'QKCBTC', 'QLCBTC', 'QNTBTC', 'QSPBTC', 'QTUMBTC', 'QUICKBTC', 'RADBTC', 'RAMPBTC', 'RAREBTC', 'RDNBTC', 'REEFBTC', 'RENBTC', 'RENBTCBTC', 'REPBTC', 'REQBTC', 'RGTBTC', 'RIFBTC', 'RLCBTC', 'ROSEBTC', 'RSRBTC', 'RUNEBTC', 'RVNBTC', 'SANDBTC', 'SCBTC', 'SCRTBTC', 'SFPBTC', 'SKLBTC', 'SNMBTC', 'SNTBTC', 'SNXBTC', 'SOLBTC', 'SRMBTC', 'SSVBTC', 'STEEMBTC', 'STMXBTC', 'STORJBTC', 'STPTBTC', 'STRAXBTC', 'STXBTC', 'SUPERBTC', 'SUSHIBTC', 'SXPBTC', 'SYSBTC', 'TCTBTC', 'TFUELBTC', 'THETABTC', 'TKOBTC', 'TLMBTC', 'TOMOBTC', 'TORNBTC', 'TRBBTC', 'TRIBEBTC', 'TRUBTC', 'TRXBTC', 'TVKBTC', 'TWTBTC', 'UMABTC', 'UNFIBTC', 'UNIBTC', 'UTKBTC', 'VETBTC', 'VGXBTC', 'VIBBTC', 'VIDTBTC', 'VITEBTC', 'WABIBTC', 'WANBTC', 'WAVESBTC', 'WAXPBTC', 'WBTCBTC', 'WINGBTC', 'WNXMBTC', 'WRXBTC', 'WTCBTC', 'XEMBTC', 'XLMBTC', 'XMRBTC', 'XRPBTC', 'XTZBTC', 'XVGBTC', 'XVSBTC', 'YFIBTC', 'YFIIBTC', 'YGGBTC', 'YOYOBTC', 'ZECBTC', 'ZENBTC', 'ZILBTC', 'ZRXBTC']
CodePudding user response:
This is how you can send multiple requests to binance API server using python-binance
import asyncio
from binance import AsyncClient
RESULTS = [] # let's store all results here
class GetAllBinanceData:
def __init__(self, workers_num: int = 10):
self.workers_num: int = workers_num
self.task_q: asyncio.Queue = asyncio.Queue(maxsize=10)
async def get_symbols_from_somewhere(self):
"""Get symbols and distribute them among workers"""
# imagine the symbols are from some file
symbols = ["BNBBTC", "ETHBTC", "NEOBTC"] * 100
for i in symbols:
await self.task_q.put(i)
for i in range(self.workers_num):
await self.task_q.put(None)
async def get_historical_klines(self, client: AsyncClient):
"""Get data and print it"""
while True:
symbol = await self.task_q.get()
if symbol is None:
break
klines = await client.get_historical_klines(
symbol=symbol,
interval=AsyncClient.KLINE_INTERVAL_1MINUTE,
start_str="2021-11-23 10:00:00",
end_str="2021-11-23 10:01:00"
)
print(klines) # just print
RESULTS.append(klines) # send somewhere else
async def amain(self) -> None:
"""Main async wrapper fucntion"""
client = await AsyncClient.create()
await asyncio.gather(
self.get_symbols_from_somewhere(),
*[self.get_historical_klines(client) for _ in range(self.workers_num)]
)
await client.close_connection()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(GetAllBinanceData().amain())
print("*" * 100)
print(RESULTS)
Another very straightforward and less efficient way. Not that Binance became angry when you create multiple connections and can start ignoring you.
from binance import Client
from concurrent.futures import ThreadPoolExecutor
RESULTS = [] # let's store all results here
SYMBOLS = ["BNBBTC", "ETHBTC", "NEOBTC"] * 100
def get_historical_klines(symbol):
try:
client = Client()
klines = client.get_historical_klines(
symbol=symbol,
interval=Client.KLINE_INTERVAL_1MINUTE,
start_str="2021-11-23 10:00:00",
end_str="2021-11-23 10:01:00"
)
print(klines) # just print
RESULTS.append(klines) # send somewhere else
finally:
client.close_connection()
if __name__ == "__main__":
with ThreadPoolExecutor(max_workers=10) as pool:
pool.map(get_historical_klines, SYMBOLS)
print("*" * 100)
print(len(RESULTS))