Home > other >  How to call async function in class
How to call async function in class

Time:12-24

import json
import asyncio
import httpx
from web3 import Web3

url_1inch_bsc = 'https://api.1inch.io/v5.0/56/quote/'
web3_bsc = Web3(Web3.HTTPProvider('https://bscrpc.com'))

class pars:
    @staticmethod
    async def parsing_bsc_1inch(self):
        bsc_pairs = open('pairs_bsc.txt', 'r')
        for line in bsc_pairs:
            line = line.rstrip().split('&')
            params={'fromTokenAddress':line[1], 'toTokenAddress':line[2], 'amount':'1000000000000000000'}
            async with httpx.AsyncClient() as client:
                try: 
                    response = await client.get(url=url_1inch_bsc, params=params)
                    json_response = json.loads(response.text)
                    print(f'{line[0]} {web3_bsc.fromWei(int(json_response["fromTokenAmount"]), "ether")} -> {web3_bsc.fromWei(int(json_response["toTokenAmount"]), "ether")}')
                except:
                    print(f'Error with request to bsc 1inch, {response}') 
        bsc_pairs.close()


if __name__ == "__main__":
    pars().parsing_bsc_1inch()

I don't understand how I can call an asynchronous function from a class.

I've tried using @staticmethod and self in the function, but nothing works. Before, I called the function with asyncio.run(parsing_bsc_1inch()), but now I can't even figure out how to do it from the class.

CodePudding user response:

Create an instance of the class first and then call the async function on that instance.

import json
import asyncio
import httpx
from web3 import Web3

url_1inch_bsc = 'https://api.1inch.io/v5.0/56/quote/'
web3_bsc = Web3(Web3.HTTPProvider('https://bscrpc.com'))

class pars:
    @staticmethod
    async def parsing_bsc_1inch(self):
        bsc_pairs = open('pairs_bsc.txt', 'r')
        for line in bsc_pairs:
            line = line.rstrip().split('&')
            params={'fromTokenAddress':line[1], 'toTokenAddress':line[2], 'amount':'1000000000000000000'}
            async with httpx.AsyncClient() as client:
                try: 
                    response = await client.get(url=url_1inch_bsc, params=params)
                    json_response = json.loads(response.text)
                    print(f'{line[0]} {web3_bsc.fromWei(int(json_response["fromTokenAmount"]), "ether")} -> {web3_bsc.fromWei(int(json_response["toTokenAmount"]), "ether")}')
                except:
                    print(f'Error with request to bsc 1inch, {response}') 
        bsc_pairs.close()


if __name__ == "__main__":
    p = pars()
    asyncio.run(p.parsing_bsc_1inch())
  • Related