This was my first initial code and it is successful in requesting for multiple Ethereum addresses balance.
import requests
import time
import pandas as pd
start = time.time()
df = pd.read_csv('ethereumaddresses.csv')
Wallet_Address=(df.loc[:,'Address'])
results = []
start = time.time()
for address in Wallet_Address:
url = f"https://blockscout.com/eth/mainnet/api?module=account&action=eth_get_balance&address={address}"
response = requests.get(url)
results = response.json()
print(results)
end = time.time()
total_time = end - start
print(f"It took {total_time} to make {len(Wallet_Address)} API calls")
However, I am requesting for 1000 Ethereum Addresses and I wanted to improve my code with the async function. This was my attempt. What am I doing wrong?
import asyncio
import aiohttp
import time
import pandas as pd
start = time.time()
df = pd.read_csv('Ethereum/ethereumaddresses.csv')
Wallet_Address=(df.loc[:,'Address'])
results = []
def get_tasks(session):
tasks = []
for address in Wallet_Address:
url = f"https://blockscout.com/eth/mainnet/api?module=account&action=eth_get_balance&address={address}"
tasks.append(session.get(url,ssl=False))
print(address)
return tasks
session_timeout = aiohttp.ClientTimeout(total=None)
async def get_balances():
async with aiohttp.ClientSession(timeout=session_timeout) as session:
tasks = get_tasks(session)
responses = await asyncio.gather(*tasks)
for response in responses:
results.append(await response.json())
asyncio.run(get_balances())
end = time.time()
total_time = end - start
print(f"It took {total_time} seconds to make {len(Wallet_Address)} API calls")
It gives me an error:
RuntimeError: await wasn't used with future
_OverlappedFuture exception was never retrieved
future: <_OverlappedFuture finished exception=OSError(22, 'The I/O operation has been aborted because of either a thread exit or an application request', None, 995, None)>
Traceback (most recent call last):
File "AppData\Local\Programs\Python\Python310\lib\asyncio\windows_events.py", line 817, in _poll
value = callback(transferred, key, ov)
File "AppData\Local\Programs\Python\Python310\lib\asyncio\windows_events.py", line 604, in finish_connect
ov.getresult()
OSError: [WinError 995] The I/O operation has been aborted because of either a thread exit or an application request
CodePudding user response:
Based on a quick search and a few tests on my end, it seems this error has nothing to do with your code per se. Here is a relevant post (albeit for C#, but about the same Windows error):
The I/O operation has been aborted because of either a thread exit or an application request
Since I could run (the web-related part of) your async code without getting this error, I assume this either has something to do with the OS or the API you are accessing. You mentioned you were trying to do 1000 concurrent requests. Rate limiting is a thing. So maybe your connection was closed by the server. It's impossible to definitively diagnose this with the information we have.
To test this further, I would suggest you try to run just the same script you already have, but with just two or three concurrent requests (i.e. with a very small subset of your wallets addresses). If this works, you at least have a starting point.
If it has something to do with the number of requests being too big, you could consider gathering your coroutines in batches. Here is a simple implementation of a function you can substitute for asyncio.gather
:
from asyncio import gather
from collections.abc import Awaitable
from typing import TypeVar
T = TypeVar("T")
async def gather_in_batches(
*aws: Awaitable[T],
batch_size: int,
return_exceptions: bool = False,
) -> list[T]:
results: list[T] = []
for idx in range(0, len(aws), batch_size):
results.extend(
await gather(
*aws[idx:idx batch_size],
return_exceptions=return_exceptions,
)
)
return results
If you have 1000 coroutines and set a batch size of 50 for example, the gather
function is sequentially awaited 20 times in that loop. You could even introduce an artificial delay between each iteration using asyncio.sleep
, if you wanted. Maybe this will help.
With regards to your script, I took the liberty of refactoring it and cleaning it up a bit. Here is a full version that should work as intended:
import json
import sys
from asyncio import gather, run
from collections.abc import Awaitable, Iterable
from time import time
from typing import Any, TypeVar
import pandas as pd
from aiohttp import ClientSession, ClientTimeout
T = TypeVar("T")
BASE_URL = "https://blockscout.com/eth/mainnet/api?module=account&action=eth_get_balance&address="
DEFAULT_TIMEOUT = ClientTimeout(total=None)
def load_wallet_addresses(csv_path: str) -> pd.Series: # type: ignore[type-arg]
df = pd.read_csv(csv_path)
return df.loc[:, "Address"]
async def gather_in_batches(
*aws: Awaitable[T],
batch_size: int,
return_exceptions: bool = False,
) -> list[T]:
results: list[T] = []
for idx in range(0, len(aws), batch_size):
results.extend(
await gather(
*aws[idx:idx batch_size],
return_exceptions=return_exceptions,
)
)
return results
async def get_balance(session: ClientSession, address: str) -> dict[str, Any]:
async with session.get(BASE_URL address, ssl=False) as response:
return await response.json() # type: ignore[no-any-return]
async def get_balances_in_batches(
addresses: Iterable[str],
batch_size: int = 1,
) -> list[dict[str, Any]]:
async with ClientSession(timeout=DEFAULT_TIMEOUT) as session:
coroutines = (get_balance(session, address) for address in addresses)
return await gather_in_batches(*coroutines, batch_size=batch_size)
async def main() -> None:
csv_path = sys.argv[1]
try:
batch_size = int(sys.argv[2])
except IndexError:
batch_size = 1
addresses = load_wallet_addresses(csv_path)
time_start = time()
results = await get_balances_in_batches(addresses, batch_size)
time_total = time() - time_start
print(json.dumps(results, indent=4))
num_total = len(addresses)
num_batches = num_total // batch_size (num_total % batch_size > 0)
print(
f"It took {time_total:.3} to make {num_total} API calls in "
f"{num_batches} batches of up to {batch_size} concurrent calls each."
)
if __name__ == "__main__":
run(main())
You call it like this:
python path/to/script.py path/to/wallets.csv 32
The last argument is the batch size and it is optional. If you omit it, the batch size defaults to 1
, which means the requests are all done sequentially.
Hope this helps.