Home > OS >  Asyncio Future ThreadExecutor
Asyncio Future ThreadExecutor

Time:05-05

I am trying to convert this converter (XML files from S3, to JSON) into a multithreaded application so I can speed up execution of multiple files (have 985). Whenever I run this I get: RuntimeWarning: coroutine 'process_object' was never awaited

Here is the code at a high level:

async def process_object(filename, pid=None):
    start = time.time()
    s3 = S3Client(...)
    opensearch_client = OpenSearchClient(...)
    Parser.DEBUG = True
    parser = Parser(s3, opensearch_client)
    save_file = ...
    s3.download_from_s3(filename, save_file)
    parser.current_prefix = filename
    await parser.parse(save_file)
    return f"Processed {filename} in {time.time() - start} seconds"

if "__main__" == __name__:
    objects = get_objects(top_n=3) # list of prefixes for S3

    loop = asyncio.get_event_loop()

    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        futures = [
            asyncio.wrap_future(future)
            for future in [
                loop.run_in_executor(executor, process_object, url) for url in objects
            ]
        ]
        results = loop.run_until_complete(asyncio.gather(*futures))

    loop.close()

CodePudding user response:

I have modified and simplified your code I don't know why you are combining threadpool futures with asyncio, if you want to limit the number of workers you can use Semaphores in Asyncio

Below is the code without using concurrent futures and simplified code that works as i can't reproduce above error exactly in my local

Try this:

async def process_object(filename, pid=None):
    start = time.time()
    s3 = S3Client(...)
    opensearch_client = OpenSearchClient(...)
    Parser.DEBUG = True
    parser = Parser(s3, opensearch_client)
    save_file = ...
    s3.download_from_s3(filename, save_file)
    parser.current_prefix = filename
    await parser.parse(save_file)
    print(f"Processed {filename} in {time.time() - start} seconds")


async def process_objects_bg(objects):
    resp = await asyncio.gather(*[process_object(url) for url in objects])


if "__main__" == __name__:
    objects = get_objects(top_n=3)  # list of prefixes for S3
    asyncio.run(process_objects_bg(objects))
  • Related