I am new to Python and I am working on an existing python project that does execution of some custom scripts that are inputted by the user. The scripts are run as python subprocess
await asyncio.create_subprocess_exec(...)
Majority of the methods in my project are defined with async keyword with await on the caller. The problem is that at any point in time, there is only one thread running so essentially I am only able to run single shell script and only when that is complete a new execution starts. I am planning to move the execution of the logic (sequence of such async methods) into a ThreadPoolExecutor or ProcessPoolExecutor
I tried converting all the async def
methods to simple methods. But inside a few of these methods, there are some calls to other async methods from third party modules. And it fails with SyntaxError: 'await' outside async function
. Looks like any async
calls should be made from within async def
methods. So I cannot really convert all my methods to def
Any suggestions on what to do to make this project multi-threaded ?
CodePudding user response:
Since I am a new contributor I can not add comment yet so let me reply in answer. @Mohan I think your code is already having parallel processing.
coroutine asyncio.create_subprocess_exec(program, *args, stdin=None, stdout=None, stderr=None, limit=None, **kwds)¶ Create a subprocess.
The limit argument sets the buffer limit for StreamReader wrappers for Process.stdout and Process.stderr (if subprocess.PIPE is passed to stdout and stderr arguments).
Return a Process instance.
As you can see it returns a Process object - meaning OS process has been created. I believe that means OS gonna run the subprocess not python and since OS has access to all cores it will be using all available cores.
edited
https://docs.python.org/3/library/asyncio-subprocess.html
Directly from the documentation:
import asyncio
async def run(cmd):
proc = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
stdout, stderr = await proc.communicate()
print(f'[{cmd!r} exited with {proc.returncode}]')
if stdout:
print(f'[stdout]\n{stdout.decode()}')
if stderr:
print(f'[stderr]\n{stderr.decode()}')
async def main():
await asyncio.gather(
run('ls /zzz'),
run('sleep 1; echo "hello"'))
asyncio.run(main())
From Documentation:
Because all asyncio subprocess functions are asynchronous and asyncio provides many tools to work with such functions, it is easy to execute and monitor multiple subprocesses in parallel.
For running tasks concurrently all you have to do is use asyncio.gather() function.
# Without loop
a = asyncio.create_subprocess_exec(...)
b = asyncio.create_subprocess_exec(...)
await asyncio.gather(a,b)
# Using loop
tasks = []
for a,b,c in some_func():
tasks.append(asyncio.create_subprocess_exec(a,b,c,...))
await asyncio.gather(*tasks)
Unless you want to leverage multiple cores this should just work without even converting async def to def. If you wish to use all cores then check out the below links and play with ProcessPoolExecutor.
edited
I am not sure if it is even relevant but I found this method in documentation:
https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor
https://docs.python.org/3/library/asyncio-task.html#running-in-threads
Somewhat related question: How to properly use asyncio run_coroutine_threadsafe function?