Home > Blockchain >  Executing async methods in a ThreadPoolExecutor
Executing async methods in a ThreadPoolExecutor

Time:08-06

I am new to Python and I am working on an existing python project that does execution of some custom scripts that are inputted by the user. The scripts are run as python subprocess

 await asyncio.create_subprocess_exec(...)

Majority of the methods in my project are defined with async keyword with await on the caller. The problem is that at any point in time, there is only one thread running so essentially I am only able to run single shell script and only when that is complete a new execution starts. I am planning to move the execution of the logic (sequence of such async methods) into a ThreadPoolExecutor or ProcessPoolExecutor

I tried converting all the async def methods to simple methods. But inside a few of these methods, there are some calls to other async methods from third party modules. And it fails with SyntaxError: 'await' outside async function . Looks like any async calls should be made from within async def methods. So I cannot really convert all my methods to def

Any suggestions on what to do to make this project multi-threaded ?

CodePudding user response:

Since I am a new contributor I can not add comment yet so let me reply in answer. @Mohan I think your code is already having parallel processing.

coroutine asyncio.create_subprocess_exec(program, *args, stdin=None, stdout=None, stderr=None, limit=None, **kwds)¶ Create a subprocess.

The limit argument sets the buffer limit for StreamReader wrappers for Process.stdout and Process.stderr (if subprocess.PIPE is passed to stdout and stderr arguments).

Return a Process instance.

As you can see it returns a Process object - meaning OS process has been created. I believe that means OS gonna run the subprocess not python and since OS has access to all cores it will be using all available cores.

edited

https://docs.python.org/3/library/asyncio-subprocess.html

Directly from the documentation:

import asyncio

async def run(cmd):
    proc = await asyncio.create_subprocess_shell(
        cmd,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE)

    stdout, stderr = await proc.communicate()

    print(f'[{cmd!r} exited with {proc.returncode}]')
    if stdout:
        print(f'[stdout]\n{stdout.decode()}')
    if stderr:
        print(f'[stderr]\n{stderr.decode()}')


async def main():
    await asyncio.gather(
        run('ls /zzz'),
        run('sleep 1; echo "hello"'))

asyncio.run(main())

From Documentation:

Because all asyncio subprocess functions are asynchronous and asyncio provides many tools to work with such functions, it is easy to execute and monitor multiple subprocesses in parallel.

For running tasks concurrently all you have to do is use asyncio.gather() function.

# Without loop
a = asyncio.create_subprocess_exec(...) 
b = asyncio.create_subprocess_exec(...)
await asyncio.gather(a,b)

# Using loop
tasks = []
for a,b,c in some_func():
    tasks.append(asyncio.create_subprocess_exec(a,b,c,...))

await asyncio.gather(*tasks)

Unless you want to leverage multiple cores this should just work without even converting async def to def. If you wish to use all cores then check out the below links and play with ProcessPoolExecutor.

edited

I am not sure if it is even relevant but I found this method in documentation:

https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor

https://docs.python.org/3/library/asyncio-task.html#running-in-threads

Somewhat related question: How to properly use asyncio run_coroutine_threadsafe function?

  • Related