Home > Enterprise >  How to make batch requests in web3.py?
How to make batch requests in web3.py?

Time:11-02

  1. This is my take for async based on this How to use AsyncHTTPProvider in web3py? article. However, upon running this code it executes like a synchronous function.

  2. For web3.js, there is a support for batch request https://dapp-world.com/smartbook/web3-batch-request-Eku8 . However, web3.py does not have any.

  3. I am using Ethereum Alchemy API which supports about 19 API calls per second.

  4. I have about 1000 Ethereum Addresses

  5. How do I modify the code such that I am able to batch 19 addresses per second?

from web3 import Web3
from web3.eth import AsyncEth
import time 
import pandas as pd
import aiohttp
import asyncio

alchemy_url = "https://eth-mainnet.g.alchemy.com/v2/zCTn-wyjipF5DvGFVNEx_XqCKZakaB57"
w3 = Web3(Web3.AsyncHTTPProvider(alchemy_url), modules={'eth': (AsyncEth,)}, middlewares=[])

start = time.time()

df = pd.read_csv('Ethereum/ethereumaddresses.csv') 
Wallet_Address=(df.loc[:,'Address'])
#Balance_storage = []

session_timeout = aiohttp.ClientTimeout(total=None)


async def get_balances():
    for address in Wallet_Address:
        balance = await w3.eth.get_balance(address)
        print(address, balance)
            
asyncio.run(get_balances())
end = time.time()
total_time = end - start
print(f"It took {total_time} seconds to make {len(Wallet_Address)} API calls")

CodePudding user response:

I think my idea isn't the best but you can use it as a temporary solution.

For this, you have to use ThreadpoolExecutor.

I executed a benchmark and found these results:

Doing simple math (1000 wallets / 19 calls per sec.) we know your process needs to take at least something close to 50 seconds. Try running at 100ms delays and if it doesn't work you can increase more delay.

One of the problems with using time.sleep is if you are using GUI or something like that which we can't pause (because GUI will freeze) during the process. (I think you can use multiprocessing to bypass this xD)

The second problem is that doing this will probably change each address's position in CSV. (You can attribute _id or something like that for each address to organize with For Loops after ends.)

Code: Working Good at BSC (Just change the RPC). This code will find all balances and store them inside self.data (defaultdict). After this, save it in new CSV file called "newBalances.csv" (You can change this)

from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from web3 import Web3
import pandas as pd
import time

class multiGetBalanceExample():

    def __init__(self):
        self.initialtime = datetime.now() #initial time

        #=== Setup Web3 ===#
        self.bsc = "https://bsc-dataseed.binance.org/" #rpc (change this)
        self.web3 = Web3(Web3.HTTPProvider(self.bsc)) #web3 connect 

        #=== Loading Csv file ===#
        self.df = pd.read_csv(r"./Ethereum/ethereumaddresses.csv") 
        self.wallet_address=(self.df.loc[:,'Address'])

        #=== Setup Temporary Address/Balance Save Defaultdict ===#
        self.data = defaultdict(list)
        
        #=== Start ===#
        self.start_workers(self.data)

        #=== Finish ===#
        self.saveCsv() #saving in new csv file
        self.finaltime = datetime.now() #end time
        print(f"\nFinished! Process takes: {self.finaltime - self.initialtime}")



    def start_workers(self, data, workers=10):
        with ThreadPoolExecutor(max_workers=workers) as executor:
            [executor.submit(self.getBalances, _data=data,  _from=0, _to=101)]
            [executor.submit(self.getBalances, _data=data,  _from=101, _to=201)]
            [executor.submit(self.getBalances, _data=data,  _from=201, _to=301)]
            [executor.submit(self.getBalances, _data=data,  _from=301, _to=401)]
            [executor.submit(self.getBalances, _data=data,  _from=401, _to=501)]
            [executor.submit(self.getBalances, _data=data,  _from=501, _to=601)]
            [executor.submit(self.getBalances, _data=data,  _from=601, _to=701)]
            [executor.submit(self.getBalances, _data=data,  _from=701, _to=801)]
            [executor.submit(self.getBalances, _data=data,  _from=801, _to=901)]
            [executor.submit(self.getBalances, _data=data,  _from=901, _to=1000)]

        return data



    def getBalances(self, _data, _from, _to):
        for i in range (_from, _to):
            # == Getting Balances from each wallet == #
            get_balance = self.web3.eth.get_balance(self.wallet_address[i])

            # == Appending in self.data == #
            _data["Address"].append(self.wallet_address[i])
            _data["Balance"].append(get_balance)

            # == Print and time.sleep(100ms) == #
            print(f"Found: {self.wallet_address[i], get_balance}\n") #printing process.
            time.sleep(0.1) #change this conform to your max limit (in my test 100ms takes 40 seconds to finish.)

        return _data



    def saveCsv(self):
        #== Creating new CSV File ==#
        headers = ["Address","Balance"]
        new_df = pd.DataFrame(columns=headers)

        new_df["Address"] = self.data["Address"]
        new_df["Balance"] = self.data["Balance"]

        new_df.to_csv(r"./Ethereum/newBalances.csv", index=False) #save   



multiGetBalanceExample()
  • Related