Home > OS >  How do I remediate async errors like this?
How do I remediate async errors like this?

Time:10-27

This was my first initial code and it is successful in requesting for multiple Ethereum addresses balance.

import requests
import time
import pandas as pd

start = time.time()
df = pd.read_csv('ethereumaddresses.csv') 
Wallet_Address=(df.loc[:,'Address'])
results = []

start = time.time()
for address in Wallet_Address:
    url = f"https://blockscout.com/eth/mainnet/api?module=account&action=eth_get_balance&address={address}"
    response = requests.get(url)
    results = response.json()
    
    print(results)
    
end = time.time()

total_time = end - start

print(f"It took {total_time} to make {len(Wallet_Address)} API calls")

However, I am requesting for 1000 Ethereum Addresses and I wanted to improve my code with the async function. This was my attempt. What am I doing wrong?

import asyncio
import aiohttp
import time
import pandas as pd


start = time.time()
df = pd.read_csv('Ethereum/ethereumaddresses.csv') 
Wallet_Address=(df.loc[:,'Address'])
results = []

def get_tasks(session):
    tasks = []
    for address in Wallet_Address:
        url = f"https://blockscout.com/eth/mainnet/api?module=account&action=eth_get_balance&address={address}"
        tasks.append(session.get(url,ssl=False))
        print(address)
    return tasks

session_timeout = aiohttp.ClientTimeout(total=None)

async def get_balances():
    async with aiohttp.ClientSession(timeout=session_timeout) as session:
       tasks = get_tasks(session)
       responses = await asyncio.gather(*tasks) 
       for response in responses:
        results.append(await response.json())
    
asyncio.run(get_balances()) 
  
end = time.time()
total_time = end - start
print(f"It took {total_time} seconds to make {len(Wallet_Address)} API calls")

It gives me an error:

 RuntimeError: await wasn't used with future
_OverlappedFuture exception was never retrieved
future: <_OverlappedFuture finished exception=OSError(22, 'The I/O operation has been aborted because of either a thread exit or an application request', None, 995, None)>
Traceback (most recent call last):
  File "AppData\Local\Programs\Python\Python310\lib\asyncio\windows_events.py", line 817, in _poll
    value = callback(transferred, key, ov)
  File "AppData\Local\Programs\Python\Python310\lib\asyncio\windows_events.py", line 604, in finish_connect
    ov.getresult()
OSError: [WinError 995] The I/O operation has been aborted because of either a thread exit or an application request

CodePudding user response:

Based on a quick search and a few tests on my end, it seems this error has nothing to do with your code per se. Here is a relevant post (albeit for C#, but about the same Windows error):

The I/O operation has been aborted because of either a thread exit or an application request

Since I could run (the web-related part of) your async code without getting this error, I assume this either has something to do with the OS or the API you are accessing. You mentioned you were trying to do 1000 concurrent requests. Rate limiting is a thing. So maybe your connection was closed by the server. It's impossible to definitively diagnose this with the information we have.

To test this further, I would suggest you try to run just the same script you already have, but with just two or three concurrent requests (i.e. with a very small subset of your wallets addresses). If this works, you at least have a starting point.

If it has something to do with the number of requests being too big, you could consider gathering your coroutines in batches. Here is a simple implementation of a function you can substitute for asyncio.gather:

from asyncio import gather
from collections.abc import Awaitable
from typing import TypeVar

T = TypeVar("T")

async def gather_in_batches(
    *aws: Awaitable[T],
    batch_size: int,
    return_exceptions: bool = False,
) -> list[T]:
    results: list[T] = []
    for idx in range(0, len(aws), batch_size):
        results.extend(
            await gather(
                *aws[idx:idx   batch_size],
                return_exceptions=return_exceptions,
            )
        )
    return results

If you have 1000 coroutines and set a batch size of 50 for example, the gather function is sequentially awaited 20 times in that loop. You could even introduce an artificial delay between each iteration using asyncio.sleep, if you wanted. Maybe this will help.

With regards to your script, I took the liberty of refactoring it and cleaning it up a bit. Here is a full version that should work as intended:

import json
import sys
from asyncio import gather, run
from collections.abc import Awaitable, Iterable
from time import time
from typing import Any, TypeVar

import pandas as pd
from aiohttp import ClientSession, ClientTimeout


T = TypeVar("T")

BASE_URL = "https://blockscout.com/eth/mainnet/api?module=account&action=eth_get_balance&address="
DEFAULT_TIMEOUT = ClientTimeout(total=None)


def load_wallet_addresses(csv_path: str) -> pd.Series:  # type: ignore[type-arg]
    df = pd.read_csv(csv_path)
    return df.loc[:, "Address"]


async def gather_in_batches(
    *aws: Awaitable[T],
    batch_size: int,
    return_exceptions: bool = False,
) -> list[T]:
    results: list[T] = []
    for idx in range(0, len(aws), batch_size):
        results.extend(
            await gather(
                *aws[idx:idx   batch_size],
                return_exceptions=return_exceptions,
            )
        )
    return results


async def get_balance(session: ClientSession, address: str) -> dict[str, Any]:
    async with session.get(BASE_URL   address, ssl=False) as response:
        return await response.json()  # type: ignore[no-any-return]


async def get_balances_in_batches(
    addresses: Iterable[str],
    batch_size: int = 1,
) -> list[dict[str, Any]]:
    async with ClientSession(timeout=DEFAULT_TIMEOUT) as session:
        coroutines = (get_balance(session, address) for address in addresses)
        return await gather_in_batches(*coroutines, batch_size=batch_size)


async def main() -> None:
    csv_path = sys.argv[1]
    try:
        batch_size = int(sys.argv[2])
    except IndexError:
        batch_size = 1
    addresses = load_wallet_addresses(csv_path)
    time_start = time()
    results = await get_balances_in_batches(addresses, batch_size)
    time_total = time() - time_start
    print(json.dumps(results, indent=4))
    num_total = len(addresses)
    num_batches = num_total // batch_size   (num_total % batch_size > 0)
    print(
        f"It took {time_total:.3} to make {num_total} API calls in "
        f"{num_batches} batches of up to {batch_size} concurrent calls each."
    )


if __name__ == "__main__":
    run(main())

You call it like this:

python path/to/script.py path/to/wallets.csv 32

The last argument is the batch size and it is optional. If you omit it, the batch size defaults to 1, which means the requests are all done sequentially.

Hope this helps.

  • Related