import json
import asyncio
import httpx
from web3 import Web3
url_1inch_bsc = 'https://api.1inch.io/v5.0/56/quote/'
web3_bsc = Web3(Web3.HTTPProvider('https://bscrpc.com'))
class pars:
@staticmethod
async def parsing_bsc_1inch(self):
bsc_pairs = open('pairs_bsc.txt', 'r')
for line in bsc_pairs:
line = line.rstrip().split('&')
params={'fromTokenAddress':line[1], 'toTokenAddress':line[2], 'amount':'1000000000000000000'}
async with httpx.AsyncClient() as client:
try:
response = await client.get(url=url_1inch_bsc, params=params)
json_response = json.loads(response.text)
print(f'{line[0]} {web3_bsc.fromWei(int(json_response["fromTokenAmount"]), "ether")} -> {web3_bsc.fromWei(int(json_response["toTokenAmount"]), "ether")}')
except:
print(f'Error with request to bsc 1inch, {response}')
bsc_pairs.close()
if __name__ == "__main__":
pars().parsing_bsc_1inch()
I don't understand how I can call an asynchronous function from a class.
I've tried using @staticmethod
and self in the function, but nothing works. Before, I called the function with asyncio.run(parsing_bsc_1inch())
, but now I can't even figure out how to do it from the class.
CodePudding user response:
Create an instance of the class first and then call the async function on that instance.
import json
import asyncio
import httpx
from web3 import Web3
url_1inch_bsc = 'https://api.1inch.io/v5.0/56/quote/'
web3_bsc = Web3(Web3.HTTPProvider('https://bscrpc.com'))
class pars:
@staticmethod
async def parsing_bsc_1inch(self):
bsc_pairs = open('pairs_bsc.txt', 'r')
for line in bsc_pairs:
line = line.rstrip().split('&')
params={'fromTokenAddress':line[1], 'toTokenAddress':line[2], 'amount':'1000000000000000000'}
async with httpx.AsyncClient() as client:
try:
response = await client.get(url=url_1inch_bsc, params=params)
json_response = json.loads(response.text)
print(f'{line[0]} {web3_bsc.fromWei(int(json_response["fromTokenAmount"]), "ether")} -> {web3_bsc.fromWei(int(json_response["toTokenAmount"]), "ether")}')
except:
print(f'Error with request to bsc 1inch, {response}')
bsc_pairs.close()
if __name__ == "__main__":
p = pars()
asyncio.run(p.parsing_bsc_1inch())