I am new with async functions and i'm trying to make multiple calls from an external API. concurrent.Futures is not quite enough to retrieve the responses so i tried with asyncio and httpx but the process is throwing an error unknown and difficult to debug for me.
It seems that the coroutine is having an empty value or is never being called.
This is my client
async def get_product_by_id_async(self, product_id: str) -> Response:
if product_id is None:
return None
response_await = await self.async_client.get(
url=f"{self.base_url}/api/catalog/stock{product_id}",
headers=self.headers,
)
if response_await.status_code == 200:
response = json.loads(response_await.text)
return response
And this is my coordinating function
async def async_get_products(data_parser):
path = data_parser.options["path"]
sku_list = client.run()
products_list = []
tasks = [await client.get_product_by_id(sku) for sku in sku_list]
breakpoint()
completed = await asyncio.gather(*tasks)
for product in completed:
products_list = build_product_df(products_list, product)
products_df = pd.DataFrame(products_list)
products_df.to_csv(path, index=False)
return products_df
def products_processor_concurrent(data_parser):
return async_get_products(data_parser)
After the solution given there is a new error in the code: This is the trace:
Traceback (most recent call last):
File "manage.py", line 32, in <module>
module.run()
File "/opt/project/providers/api/data_collector.py", line 90, in run
integrator.run()
File "/opt/project/common/integrator.py", line 448, in run
self.start_processing()
File "/opt/project/providers/api/data_collector.py", line 81, in start_processing
products.run()
File "/opt/project/common/data_parser.py", line 94, in run
self.build_dataframe()
File "/opt/project/common/data_parser.py", line 54, in build_dataframe
df = func(self)
File "/opt/project/providers/data_collector.py", line 68, in products_processor_concurrent
return run(async_get_products(data_parser))
File "/usr/local/lib/python3.8/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/local/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
return future.result()
File "/opt/project/providers/api/data_collector.py", line 57, in async_get_products
tasks = [await client.get_product_by_id(sku) for sku in sku_list]
File "/opt/project/providers/api/data_collector.py", line 57, in <listcomp>
tasks = [await client.get_product_by_id(sku) for sku in sku_list]
TypeError: object dict can't be used in 'await' expression
CodePudding user response:
When you want to run asynchronous functions from synchronous functions, you have to use the asyncio
library to run them. Your last function should look like this.
def products_processor_concurrent(data_parser):
from asyncio import run
return run(async_get_products(data_parser))