Home > Back-end >  count successful and unsuccessful post requests for asynchronous post call/request
count successful and unsuccessful post requests for asynchronous post call/request

Time:11-19

I need help in implementing the logic to count number of successful post calls which are asynchronous in nature (status_code=200) as well as failed_calls (status_code != 200)

I am new to coroutines. Would appreciate if someone can suggest a better way of making a post asynchronous call which can be retried, polled for status, and that can emit metrics for successful post requests as well.

Following is my code:

asyncio.get_event_loop().run_in_executor(
            None,
            self.publish_actual,
            event_name,
            custom_payload,
            event_message_params,
        )

which calls publish_actual:

def publish_actual(
        self,
        event_name: str,
        custom_payload={},
        event_message_params=[],
    ):
        """Submits a post request using the request library 

        :param event_name: name of the event
        :type event_name: str
        :param key: key for a particular application
        :param custom_payload: custom_payload, defaults to {}
        :type custom_payload: dict, optional
        :param event_message_params: event_message_params, defaults to []
        :type event_message_params: list, optional
        """

        json_data = {}

        path = f"/some/path"

        self.request(path, "POST", json=json_data)

which calls following request function

def request(self, api_path, method="GET", **kwargs):
        try:
            self._validate_configuration()
            headers = {}
            api_endpoint = self.service_uri.to_url(api_path)

            logger.debug(api_endpoint)

            if "headers" in kwargs and kwargs["headers"]:
                headers.update(kwargs["headers"])

            headers = {"Content-Type": "application/json"}

            begin = datetime.now()

            def build_success_metrics(response, *args, **kwargs):
                tags = {
                    "name": "success_metrics",
                    "domain": api_endpoint,
                    "status_code": 200,
                }
                build_metrics(tags)

            def check_for_errors(response, *args, **kwargs):
                response.raise_for_status()

            response = self.session.request(
                method=method,
                url=api_endpoint,
                headers=headers,
                timeout=self.timeout,
                hooks={"response": [build_success_metrics, check_for_errors]},
                **kwargs,
            )

            end = datetime.now()
            logger.debug(
                f"'{method}' request against endpoint '{api_endpoint}' took {round((end - begin).total_seconds() * 1000, 3)} ms"
            )

            logger.debug(f"response: {response}")

        except RequestException as e:
            tags = {
                "name": "error_metrics",
                "domain": api_endpoint,
                "exception_class": e.__class__.__name__,
            }
            build_metrics(tags)
            return f"Exception occured: {e}"

Let me know if anything else is required from my end to explain what exactly I have done and what I am trying to achieve.

CodePudding user response:

There is not much await and async in your example so I've just addressed the counting part of your question in general terms in asyncio. asyncio.Queue is good for this because you can separate out the counting from the cause quite simply.

import asyncio
import aiohttp

class Count():
    def __init__(self, queue: asyncio.Queue):
        self.queue = queue
        self.good = 0
        self.bad = 0
    
    async def count(self):
        while True:
            result = await self.queue.get()
            if result == 'Exit':
                return
            if result == 200:
                self.good  = 1
            else:
                self.bad  = 1

async def request(q: asyncio.Queue):
    async with aiohttp.ClientSession() as session:
        for _ in range(5):  # just poll 30 times in this instance
            await asyncio.sleep(0.1)
            async with session.get(
                'https://httpbin.org/status/200,500', ssl=False
            ) as response:
                q.put_nowait(response.status)
    q.put_nowait('Exit')

async def main():
    q = asyncio.Queue()
    cnt = Count(q)
    tasks = [cnt.count(), request(q)]
    await asyncio.gather(*[asyncio.create_task(t) for t in tasks])
    print(cnt.good, cnt.bad)

if __name__ == "__main__":
    asyncio.run(main())

Output is random given httpbin response. Should add to 5.

4 1
  • Related