Home > Back-end >  Add a coroutine after some coroutines have finished, in an already running event loop (same loop, sa
Add a coroutine after some coroutines have finished, in an already running event loop (same loop, sa

Time:12-06

I've read through a few similar questions on SO, for adding a coroutine to an already running event loop, some answers were tailored to the questions asked therefore didn't quite apply in my case; the most common ones are asyncio.ensure_future(coro(), loop=my_running_loop), or for a threadsafe version it would be asyncio.run_coroutine_threadsafe(coro(), my_running_loop); a last resort would be spawning another loop after the current one finishes.

I'll present my questions first:

  1. Why does not the first two methods work?
  2. Apart from the third method, is there another way, preferably done in the class such that I don't have to start another loop after the loop inherent to the class finishes?

A simplified example of my scenario would be that there are 1000 items sitting on the other end of a REST-API and I have to pull all of them to do some analysis. However REST-API depends on the network connection, therefore intermittently I might have some items being simply timeout error (or empty); in this case, I'd have to make calls to REST-API again to fetch those failed ones.

Just to demonstrate my scenario without all the granularity (data specific rather than code logic):

import asyncio
import ItemType  # Enum
import async_rest_api
import pandas as pd

class DataSource:
    def __init__(self):
        # some settings on self
        # particularly:
        # self.loop = asyncio.get_event_loop()
        # self.item_names_to_process = some_df  # pd.DataFrame of one column named 'name'
        # self.unprocessed_item_names = []
        # self.result = pd.DataFrame()
        # super().__init__()

    def get_item_fetching_function(item_type: ItemType):
        if item_type == ItemType.One:
           return async_rest_api.get_item_type_one
        if item_type == ItemType.Two:
           return async_rest_api.get_item_type_two
        if item_type == ItemType.Three:
           return async_rest_api.get_item_type_three

    # args were of length greater than one, 
    # here I simplified it to just item_type and item_names.    
    async def get_items(self, item_type, item_names):  
        step_size = 500
        results = []

        for i in range(0, len(item_names), step_size):
            tasks = []
            for name in item_names[i:i step_size]:
                tasks.append(self.get_item_fetching_function(item_type)(name))
            results.extend(await asyncio.gather(*tasks, return_exception=True))


        result = pd.concat([r[1] for r in results])
        processed_items_names = pd.DataFrame(results.name.unique(), columns=['name'])
        unprocessed = self.item_names_to_process.append(processed_items_names).drop_duplicates(keep=False)
        self.unprocessed = unprocessed.name.tolist()
        self.result = pd.concat([self.result, result])
        
        # Trial: I wrote another line here to add 
        # a coroutine to process the unprocessed items. 
        # I tried both in-thread and cross-thread, 
        # neither have worked and no error was thrown; 
        # the code just finished silently.

        # in the running thread

        asyncio.ensure_future(self.get_items(item_type, self.unprocessed), loop=self.loop)

        # or in another thread
   
        asyncio.run_coroutine_threadsafe(self.get_items(item_type, self.unprocessed), self.loop)


    def wrapper_function(self, item_type):
        self.loop.run_until_complete(self.get_items(item_type, self.item_names_to_process))
    
    if __names__ == "__main__":
         import DataSource
         import ItemType
         import asyncio
         data_source = DataSource()
         names = ''.join(random.choices(string.ascii_uppercase   string.digits, k=1000))
         data_source.wrapper_function(ItemType.One)
         # Trial: the last resort would be to check on the `self.unprocessed` here, if it has values, I'll re-run `self.get_items`. This method worked, as expected.
         if len(data_source.unprocessed):
            asyncio.run(data_source.get_items(ItemType.One, data_source.unprocessed))
         

CodePudding user response:

Just replace asyncio.gather for a construct you have better control of.

It goes like this: when you call a co-routine function, you get a co-routine object. This object can

  • (1) be directly awaited: your code pauses and wait for its completion in a linear fashion: therefore this is not suitable to launch tasks in parallel.
  • (2) be promoted to a "task", them, whenever your code yield to the running loop, it will step through all existing tasks, before comming back to your code.

The thing with asyncio.gather is that it does 2 things: promotes all your objects that are not tasks already to tasks, and them wait for all of them to complete, when the gather call itself is awaited.

You can either keep this same design, and just move the line results.extend(await asyncio.gather(*tasks, return_exception=True)) to inside the loop on i (and bellow the loop on name): that would await for a full batch of tasks of "step_size" size, until they are over - or, you can just create the tasks explicitly, and use asyncio.wait (https://docs.python.org/3/library/asyncio-task.html#asyncio.wait) to retrieve results in parsels, controling how much you pause before adding the next batch of tasks. (this will require a somewhat clever redesign - just calling gather for each of your setp_size batches will be much easier)

  • Related