This is my take for async based on this How to use AsyncHTTPProvider in web3py? article. However, upon running this code it executes like a synchronous function.
For web3.js, there is a support for batch request https://dapp-world.com/smartbook/web3-batch-request-Eku8 . However, web3.py does not have any.
I am using Ethereum Alchemy API which supports about 19 API calls per second.
I have about 1000 Ethereum Addresses
How do I modify the code such that I am able to batch 19 addresses per second?
from web3 import Web3
from web3.eth import AsyncEth
import time
import pandas as pd
import aiohttp
import asyncio
alchemy_url = "https://eth-mainnet.g.alchemy.com/v2/zCTn-wyjipF5DvGFVNEx_XqCKZakaB57"
w3 = Web3(Web3.AsyncHTTPProvider(alchemy_url), modules={'eth': (AsyncEth,)}, middlewares=[])
start = time.time()
df = pd.read_csv('Ethereum/ethereumaddresses.csv')
Wallet_Address=(df.loc[:,'Address'])
#Balance_storage = []
session_timeout = aiohttp.ClientTimeout(total=None)
async def get_balances():
for address in Wallet_Address:
balance = await w3.eth.get_balance(address)
print(address, balance)
asyncio.run(get_balances())
end = time.time()
total_time = end - start
print(f"It took {total_time} seconds to make {len(Wallet_Address)} API calls")
CodePudding user response:
I think my idea isn't the best but you can use it as a temporary solution.
For this, you have to use ThreadpoolExecutor.
I executed a benchmark and found these results:
Without ThreadpoolExecutor, using BSC Public RPC, just running in for loop, takes more than 3 minutes to finish the process. Click here to see the output of test 1
With ThreadpoolExecutor, BSC Public RPC, and 100ms Delay using
time.sleep(0.1)
, finishes in less than 40 seconds as you can see in the next image. Click here to see the output of test 2With ThreadpoolExecutor, using Quicknode, and 100ms Delay, finishes in 35 seconds. Click here to see the output of test 3
Doing simple math (1000 wallets / 19 calls per sec.) we know your process needs to take at least something close to 50 seconds. Try running at 100ms delays and if it doesn't work you can increase more delay.
One of the problems with using time.sleep
is if you are using GUI or something like that which we can't pause (because GUI will freeze) during the process. (I think you can use multiprocessing to bypass this xD)
The second problem is that doing this will probably change each address's position in CSV. (You can attribute _id or something like that for each address to organize with For Loops after ends.)
Code: Working Good at BSC (Just change the RPC). This code will find all balances and store them inside self.data (defaultdict)
. After this, save it in new CSV file called "newBalances.csv" (You can change this)
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from web3 import Web3
import pandas as pd
import time
class multiGetBalanceExample():
def __init__(self):
self.initialtime = datetime.now() #initial time
#=== Setup Web3 ===#
self.bsc = "https://bsc-dataseed.binance.org/" #rpc (change this)
self.web3 = Web3(Web3.HTTPProvider(self.bsc)) #web3 connect
#=== Loading Csv file ===#
self.df = pd.read_csv(r"./Ethereum/ethereumaddresses.csv")
self.wallet_address=(self.df.loc[:,'Address'])
#=== Setup Temporary Address/Balance Save Defaultdict ===#
self.data = defaultdict(list)
#=== Start ===#
self.start_workers(self.data)
#=== Finish ===#
self.saveCsv() #saving in new csv file
self.finaltime = datetime.now() #end time
print(f"\nFinished! Process takes: {self.finaltime - self.initialtime}")
def start_workers(self, data, workers=10):
with ThreadPoolExecutor(max_workers=workers) as executor:
[executor.submit(self.getBalances, _data=data, _from=0, _to=101)]
[executor.submit(self.getBalances, _data=data, _from=101, _to=201)]
[executor.submit(self.getBalances, _data=data, _from=201, _to=301)]
[executor.submit(self.getBalances, _data=data, _from=301, _to=401)]
[executor.submit(self.getBalances, _data=data, _from=401, _to=501)]
[executor.submit(self.getBalances, _data=data, _from=501, _to=601)]
[executor.submit(self.getBalances, _data=data, _from=601, _to=701)]
[executor.submit(self.getBalances, _data=data, _from=701, _to=801)]
[executor.submit(self.getBalances, _data=data, _from=801, _to=901)]
[executor.submit(self.getBalances, _data=data, _from=901, _to=1000)]
return data
def getBalances(self, _data, _from, _to):
for i in range (_from, _to):
# == Getting Balances from each wallet == #
get_balance = self.web3.eth.get_balance(self.wallet_address[i])
# == Appending in self.data == #
_data["Address"].append(self.wallet_address[i])
_data["Balance"].append(get_balance)
# == Print and time.sleep(100ms) == #
print(f"Found: {self.wallet_address[i], get_balance}\n") #printing process.
time.sleep(0.1) #change this conform to your max limit (in my test 100ms takes 40 seconds to finish.)
return _data
def saveCsv(self):
#== Creating new CSV File ==#
headers = ["Address","Balance"]
new_df = pd.DataFrame(columns=headers)
new_df["Address"] = self.data["Address"]
new_df["Balance"] = self.data["Balance"]
new_df.to_csv(r"./Ethereum/newBalances.csv", index=False) #save
multiGetBalanceExample()