Home > database >  how to run for loop as threads so it speeds up the for loop
how to run for loop as threads so it speeds up the for loop

Time:11-24

I have a for loop in which it gets the historical data for each key (historical_data function). Now I have around 200 cryptocurrencies, so it will take a long time to get the historical data one by one. Now I'm wondering if I could get the historical data for each symbol in a seperate thread.

So in the code it runs through each symbol and each interval. Then I give the columns a name and drop some of the unusable columns. After each loop the data gets appended to the list. This takes a really long time if they have to run one by one, that's why I'd like to try and run this in threads but I don't know exactly how.

Thanks in advance!

class crypto:
    symbols = []
    with open('allsymbols', 'r ') as f:
        for line in f:
            symbol = line.strip('\n')   'BTC'
            symbols.append(symbol)

    intervals = ['1m','5m']

    @staticmethod
    def historical_data():
        historical_list = []
        for i in range(len(crypto.symbols)):
            for j in range(len(crypto.intervals)):
                historical_data = client.get_historical_klines(crypto.symbols[i], crypto.intervals[j], '11/19/2021', limit=1000)[-22:-1]
                historical_df = pd.DataFrame(historical_data)
                historical_df.columns = ['time', '1', 'high', 'low', 'close', '5', '6', '7', '8', '9', '10', '11']
                historical_df.drop(columns=['1', '5', '6', '7', '8', '9', '10', '11'], axis=1, inplace=True)
                historical_df['interval'] = crypto.intervals[j]
                historical_df['symbol'] = crypto.symbols[i]
                historical_df[['high', 'low', 'close']] = historical_df[['high', 'low', 'close']].apply(pd.to_numeric, axis=1)
                historical_df['time'] = pd.to_datetime(historical_df['time'] / 1000, unit='s')
                historical_list.append(historical_df.to_dict())
        return historical_list

Note: I convert the lists into dictionaries and later on convert the values of the dictionary to a list again. This may slow down the process a lot? Maybe there's an easier way to do this but I don't know how. This is not the main focus tho.

    @staticmethod
    def refactor_list(historical_list):
        historical_list_refactored = []
        for i in range(len(historical_list)):
            single_key_data = historical_list[i]
            single_key_data['high'] = list(single_key_data['high'].values())
            single_key_data['low'] = list(single_key_data['low'].values())
            single_key_data['close'] = list(single_key_data['close'].values())
            single_key_data['interval'] = list(single_key_data['interval'].values())
            single_key_data['symbol'] = list(single_key_data['symbol'].values())
            single_key_data['time'] = list(single_key_data['time'].values())
            historical_list_refactored.append(single_key_data)
        return historical_list_refactored

Symbols list:

['1INCHBTC', 'AAVEBTC', 'ACMBTC', 'ADABTC', 'ADXBTC', 'AERGOBTC', 'AGIXBTC', 'AGLDBTC', 'AIONBTC', 'AKROBTC', 'ALGOBTC', 'ALICEBTC', 'ALPACABTC', 'ALPHABTC', 'AMBBTC', 'ANKRBTC', 'ANTBTC', 'APPCBTC', 'ARDRBTC', 'ARPABTC', 'ARBTC', 'ARKBTC', 'ASRBTC', 'ASTBTC', 'ATABTC', 'ATMBTC', 'ATOMBTC', 'AUCTIONBTC', 'AUDIOBTC', 'AUTOBTC', 'AVABTC', 'AVAXBTC', 'AXSBTC', 'BADGERBTC', 'BAKEBTC', 'BALBTC', 'BANDBTC', 'BARBTC', 'BATBTC', 'BCDBTC', 'BEAMBTC', 'BELBTC', 'BETABTC', 'BLZBTC', 'BNBBTC', 'BNTBTC', 'BNXBTC', 'BONDBTC', 'BRDBTC', 'BTCSTBTC', 'BTGBTC', 'BTSBTC', 'BZRXBTC', 'C98BTC', 'CAKEBTC', 'CELOBTC', 'CELRBTC', 'CFXBTC', 'CHESSBTC', 'CHRBTC', 'CHZBTC', 'CITYBTC', 'CKBBTC', 'CLVBTC', 'CNDBTC', 'COMPBTC', 'COSBTC', 'COTIBTC', 'CRVBTC', 'CTKBTC', 'CTSIBTC', 'CTXCBTC', 'CVCBTC', 'DARBTC', 'DASHBTC', 'DATABTC', 'DCRBTC', 'DEGOBTC', 'DGBBTC', 'DIABTC', 'DNTBTC', 'DOCKBTC', 'DODOBTC', 'DOGEBTC', 'DOTBTC', 'DREPBTC', 'DUSKBTC', 'DYDXBTC', 'EGLDBTC', 'ELFBTC', 'ENJBTC', 'ENSBTC', 'EOSBTC', 'EPSBTC', 'ETCBTC', 'ETHBTC', 'EVXBTC', 'EZBTC', 'FARMBTC', 'FETBTC', 'FIDABTC', 'FILBTC', 'FIOBTC', 'FIROBTC', 'FISBTC', 'FLMBTC', 'FLOWBTC', 'FORBTC', 'FORTHBTC', 'FRONTBTC', 'FTMBTC', 'FTTBTC', 'FUNBTC', 'FXSBTC', 'GALABTC', 'GASBTC', 'GLMBTC', 'GNOBTC', 'GOBTC', 'GRSBTC', 'GRTBTC', 'GTCBTC', 'GTOBTC', 'GXSBTC', 'HARDBTC', 'HBARBTC', 'HIVEBTC', 'HNTBTC', 'ICPBTC', 'ICXBTC', 'IDEXBTC', 'ILVBTC', 'INJBTC', 'IOSTBTC', 'IOTABTC', 'IOTXBTC', 'IRISBTC', 'JASMYBTC', 'JSTBTC', 'JUVBTC', 'KAVABTC', 'KEEPBTC', 'KLAYBTC', 'KMDBTC', 'KNCBTC', 'KSMBTC', 'LAZIOBTC', 'LINABTC', 'LINKBTC', 'LITBTC', 'LOOMBTC', 'LPTBTC', 'LRCBTC', 'LSKBTC', 'LTCBTC', 'LTOBTC', 'LUNABTC', 'MANABTC', 'MATICBTC', 'MBOXBTC', 'MDABTC', 'MDTBTC', 'MDXBTC', 'MINABTC', 'MIRBTC', 'MITHBTC', 'MKRBTC', 'MLNBTC', 'MOVRBTC', 'MTHBTC', 'MTLBTC', 'NANOBTC', 'NASBTC', 'NAVBTC', 'NEARBTC', 'NEBLBTC', 'NEOBTC', 'NKNBTC', 'NMRBTC', 'NUBTC', 'NULSBTC', 'NXSBTC', 'OAXBTC', 'OCEANBTC', 'OGBTC', 'OGNBTC', 'OMBTC', 'OMGBTC', 'ONEBTC', 'ONGBTC', 'ONTBTC', 'ORNBTC', 'OXTBTC', 'PAXGBTC', 'PERLBTC', 'PERPBTC', 'PHABTC', 'PHBBTC', 'PIVXBTC', 'PNTBTC', 'POLSBTC', 'POLYBTC', 'PONDBTC', 'PORTOBTC', 'POWRBTC', 'PROMBTC', 'PSGBTC', 'QIBTC', 'QKCBTC', 'QLCBTC', 'QNTBTC', 'QSPBTC', 'QTUMBTC', 'QUICKBTC', 'RADBTC', 'RAMPBTC', 'RAREBTC', 'RDNBTC', 'REEFBTC', 'RENBTC', 'RENBTCBTC', 'REPBTC', 'REQBTC', 'RGTBTC', 'RIFBTC', 'RLCBTC', 'ROSEBTC', 'RSRBTC', 'RUNEBTC', 'RVNBTC', 'SANDBTC', 'SCBTC', 'SCRTBTC', 'SFPBTC', 'SKLBTC', 'SNMBTC', 'SNTBTC', 'SNXBTC', 'SOLBTC', 'SRMBTC', 'SSVBTC', 'STEEMBTC', 'STMXBTC', 'STORJBTC', 'STPTBTC', 'STRAXBTC', 'STXBTC', 'SUPERBTC', 'SUSHIBTC', 'SXPBTC', 'SYSBTC', 'TCTBTC', 'TFUELBTC', 'THETABTC', 'TKOBTC', 'TLMBTC', 'TOMOBTC', 'TORNBTC', 'TRBBTC', 'TRIBEBTC', 'TRUBTC', 'TRXBTC', 'TVKBTC', 'TWTBTC', 'UMABTC', 'UNFIBTC', 'UNIBTC', 'UTKBTC', 'VETBTC', 'VGXBTC', 'VIBBTC', 'VIDTBTC', 'VITEBTC', 'WABIBTC', 'WANBTC', 'WAVESBTC', 'WAXPBTC', 'WBTCBTC', 'WINGBTC', 'WNXMBTC', 'WRXBTC', 'WTCBTC', 'XEMBTC', 'XLMBTC', 'XMRBTC', 'XRPBTC', 'XTZBTC', 'XVGBTC', 'XVSBTC', 'YFIBTC', 'YFIIBTC', 'YGGBTC', 'YOYOBTC', 'ZECBTC', 'ZENBTC', 'ZILBTC', 'ZRXBTC']

CodePudding user response:

This is how you can send multiple requests to binance API server using python-binance

import asyncio
from binance import AsyncClient

RESULTS = []  # let's store all results here


class GetAllBinanceData:
    def __init__(self, workers_num: int = 10):
        self.workers_num: int = workers_num
        self.task_q: asyncio.Queue = asyncio.Queue(maxsize=10)

    async def get_symbols_from_somewhere(self):
        """Get symbols and distribute them among workers"""
        # imagine the symbols are from some file
        symbols = ["BNBBTC", "ETHBTC", "NEOBTC"] * 100
        for i in symbols:
            await self.task_q.put(i)

        for i in range(self.workers_num):
            await self.task_q.put(None)

    async def get_historical_klines(self, client: AsyncClient):
        """Get data and print it"""
        while True:
            symbol = await self.task_q.get()
            if symbol is None:
                break
            klines = await client.get_historical_klines(
                symbol=symbol,
                interval=AsyncClient.KLINE_INTERVAL_1MINUTE,
                start_str="2021-11-23 10:00:00",
                end_str="2021-11-23 10:01:00"
            )
            print(klines)  # just print
            RESULTS.append(klines)  # send somewhere else

    async def amain(self) -> None:
        """Main async wrapper fucntion"""
        client = await AsyncClient.create()
        await asyncio.gather(
            self.get_symbols_from_somewhere(),
            *[self.get_historical_klines(client) for _ in range(self.workers_num)]
        )

        await client.close_connection()


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(GetAllBinanceData().amain())
    print("*" * 100)
    print(RESULTS)

Another very straightforward and less efficient way. Not that Binance became angry when you create multiple connections and can start ignoring you.

from binance import Client
from concurrent.futures import ThreadPoolExecutor

RESULTS = []  # let's store all results here
SYMBOLS = ["BNBBTC", "ETHBTC", "NEOBTC"] * 100


def get_historical_klines(symbol):
    try:
        client = Client()

        klines = client.get_historical_klines(
            symbol=symbol,
            interval=Client.KLINE_INTERVAL_1MINUTE,
            start_str="2021-11-23 10:00:00",
            end_str="2021-11-23 10:01:00"
        )
        print(klines)  # just print
        RESULTS.append(klines)  # send somewhere else
    finally:
        client.close_connection()


if __name__ == "__main__":
    with ThreadPoolExecutor(max_workers=10) as pool:
        pool.map(get_historical_klines, SYMBOLS)

    print("*" * 100)
    print(len(RESULTS))
  • Related