Home > front end >  How can I utilize asyncio to make third party file operations faster?
How can I utilize asyncio to make third party file operations faster?

Time:10-21

I am utilizing a third party library called isort. isort has an available function that opens and reads a file. In order to speed this up I attempted to changed the function called isort.check_file to make it perform asynchronously. The method check_file takes the file path, however the current behaviour that I have attempted does not work.

    ...
    coroutines= [self.check_file('c:\\example1.py'), self.check_file('c:\\example2.py')]
    loop = asyncio.get_event_loop()
    result = loop.run_until_complete(asyncio.gather(*coroutines))
    ...

    async def check_file(self, changed_file):
        return isort.check_file(changed_file)

However, this does not seem to work. How can I make the library call isort.check_file be utilized correctly with asyncio.gather?

CodePudding user response:

Better understanding of IO Bottleneck and GIL

What your async function check_file doing is just as same without async at front. To get any meaningful performance asynchronously, you Must be using some sort of Awaitables - which requires await keyword.

So basically what you did is:

import time

async def wait(n):
    time.sleep(n)

Which does absolutely no good for asynchronous operations. To make such synchronous function asynchronous - assuming it's mostly IO-bound - you can use asyncio.to_thread instead.

import asyncio
import time


async def task():
    await asyncio.to_thread(time.sleep, 10)  # <- await   something that's awaitable
    # similar to await asyncio.sleep(10) now


async def main():
    tasks = [task() for _ in range(10)]
    await asyncio.gather(*tasks)


asyncio.run(main())

That essentially moves IO bound operation out of main thread, so main thread can do it's work without waiting for IO works.

But there's catch - Python's Global Interpreter Lock(GIL).

Due to CPython - official python implementation - limitation, only 1 python interpreter thread can run in at any given moment, stalling all others.

Then how we achieve better performance just by moving IO to different thread? Just simply by releasing GIL during IO operations.

IO Operations are basically just like this:

"Hey OS, please do this IO works for me. Wake me up when it's done."
Thread 1 goes to sleep

Some time later, OS punches Thread 1
"Your IO Operation is done, take this and get back to work."

So all it does is Doing Nothing - for such cases, aka IO Bound stuffs, GIL can be safely released and let other threads to run. Built-in functions like time.sleep, open(), etc implements such GIL release logic in their C code.

This doesn't change much in asyncio, which is internally bunch of event checks and callbacks. Each asyncio,Tasks works like threads in some degree - tasks asking main loop to wake them up when IO operation done is done.

Now these basic simplified concepts sorted out, we can go back to your question.


CPU Bottleneck and IO Bottleneck

Bsically what you're up against is Not an IO bottleneck. It's mostly CPU/etc bottleneck.

Loading merely few KB of texts from local drives then running tons of intense Python code afterward doesn't count as an IO bound operation.


Testing

Let's consider following test case:

  • run isort.check_file for 10000 scripts as:
    • Synchronously, just like normal python codes
    • Multithreaded, with 2 threads
    • Multiprocessing, with 2 processes
    • Asynchronous, using asyncio.to_thread

We can expect that:

  • Multithreaded will be slower than Synchronous code, as there's very little IO works
  • Multiprocessing process spawning & communicating takes time, so it will be slower in short workload, faster in longer workload.
  • Asynchronous will be even more slower than the Multithreaded, because Asyncio have to deal with threads which it's not really designed for.

With folder structure of:

├─ main.py
└─ import_messes
     ├─ lib_0.py
     ├─ lib_1.py
     ├─ lib_2.py
     ├─ lib_3.py
     ├─ lib_4.py
     ├─ lib_5.py
     ├─ lib_6.py
     ├─ lib_7.py
     ├─ lib_8.py
     └─ lib_9.py

Which we'll load 1000 times each, making up to total 10000 loads.

Each of those are filled with random imports I grabbed from asyncio.

from asyncio.base_events import *
from asyncio.coroutines import *
from asyncio.events import *
from asyncio.exceptions import *
from asyncio.futures import *
from asyncio.locks import *
from asyncio.protocols import *
from asyncio.runners import *
from asyncio.queues import *
from asyncio.streams import *
from asyncio.subprocess import *
from asyncio.tasks import *
from asyncio.threads import *
from asyncio.transports import *

Source code(main.py):

"""
asynchronous isort demo
"""

import pathlib
import asyncio
import itertools
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from timeit import timeit

import isort
from isort import format


# target dir with modules
FILE = pathlib.Path("./import_messes")


# Monkey-patching isort.format.create_terminal_printer to suppress Terminal bombarding.
# Totally not required nor recommended for normal use
class SuppressionPrinter:
    def __init__(self, *_, **__):
        pass

    def success(self, *_):
        pass

    def error(self, *_):
        pass

    def diff_line(self, *_):
        pass


isort.format.BasicPrinter = SuppressionPrinter


# -----------------------------
# Test functions

def filelist_gen():
    """Chain directory list multiple times to get meaningful difference"""
    yield from itertools.chain.from_iterable([FILE.iterdir() for _ in range(1000)])


def isort_synchronous(path_iter):
    """Synchronous usual isort use-case"""

    # return list of results
    return [isort.check_file(file) for file in path_iter]


def isort_thread(path_iter):
    """Threading isort"""

    # prepare thread pool
    with ThreadPoolExecutor(max_workers=2) as executor:
        # start loading
        futures = [executor.submit(isort.check_file, file) for file in path_iter]

        # return list of results
        return [fut.result() for fut in futures]


def isort_multiprocess(path_iter):
    """Multiprocessing isort"""

    # prepare process pool
    with ProcessPoolExecutor(max_workers=2) as executor:
        # start loading
        futures = [executor.submit(isort.check_file, file) for file in path_iter]

        # return list of results
        return [fut.result() for fut in futures]


async def isort_asynchronous(path_iter):
    """Asyncio isort using to_thread"""

    # create coroutines that delegate sync funcs to threads
    coroutines = [asyncio.to_thread(isort.check_file, file) for file in path_iter]

    # run coroutines and wait for results
    return await asyncio.gather(*coroutines)


if __name__ == '__main__':
    # run once, no repetition
    n = 1

    # synchronous runtime
    print(f"Sync func.: {timeit(lambda: isort_synchronous(filelist_gen()), number=n):.4f}")

    # threading demo
    print(f"Threading : {timeit(lambda: isort_thread(filelist_gen()), number=n):.4f}")

    # multiprocessing demo
    print(f"Multiproc.: {timeit(lambda: isort_multiprocess(filelist_gen()), number=n):.4f}")

    # asyncio to_thread demo
    print(f"to_thread : {timeit(lambda: asyncio.run(isort_asynchronous(filelist_gen())), number=n):.4f}")

Run results

Sync func.: 18.1764
Threading : 18.3138
Multiproc.: 9.5206
to_thread : 27.3645

You can see it turned out as we expected, isort.check_file is not an IO-Bound operation. Therefore best bet is using Multiprocessing, if Really needed.

If number of files are low, like hundred or below, multiprocessing will suffer even more than using asyncio.to_thread, because cost to spawn, communicate, and kill process overwhelm the multiprocessing's benefits.

Experiment with your usecase, adjust core count (max_workers) to best fit your environment and your usecase.

  • Related