What I have in mind is a very generic BackgroundTask class that can be used within webservers or standalone scripts, to schedule away tasks that don't need to be blocking.
I don't want to use any task queues (celery, rabbitmq, etc.) here because the tasks I'm thinking of are too small and fast to run. Just want to get them done as out of the way as possible. Would that be an async approach? Throwing them onto another process?
First solution I came up with that works:
# Need ParamSpec to get correct type hints in BackgroundTask init
P = ParamSpec("P")
class BackgroundTask(metaclass=ThreadSafeSingleton):
"""Easy way to create a background task that is not dependent on any webserver internals.
Usage:
async def sleep(t):
time.sleep(t)
BackgroundTask(sleep, 10) <- Creates async task and executes it separately (nonblocking, works with coroutines)
BackgroundTask(time.sleep, 9) <- Creates async task and executes it separately (nonblocking, works with normal functions)
"""
background_tasks = set()
lock = threading.Lock()
def __init__(self, func: typing.Callable[P, typing.Any], *args: P.args, **kwargs: P.kwargs) -> None:
"""Uses singleton instance of BackgroundTask to add a task to the async execution queue.
Args:
func (typing.Callable[P, typing.Any]): _description_
"""
self.func = func
self.args = args
self.kwargs = kwargs
self.is_async = asyncio.iscoroutinefunction(func)
async def __call__(self) -> None:
if self.is_async:
with self.lock:
task = asyncio.create_task(self.func(*self.args, **self.kwargs))
self.background_tasks.add(task)
print(len(self.background_tasks))
task.add_done_callback(self.background_tasks.discard)
# TODO: Create sync task (this will follow a similar pattern)
async def create_background_task(func: typing.Callable[P, typing.Any], *args: P.args, **kwargs: P.kwargs) -> None:
b = BackgroundTask(func, *args, **kwargs)
await b()
# Usage:
async def sleep(t):
time.sleep(t)
await create_background_task(sleep, 5)
I think I missed the point by doing this though. If I ran this code along with some other async code, then yes, I would get a performance benefit since blocking operations aren't blocking the main thread anymore.
I'm thinking I maybe need something more like a separate process to handle such background tasks without blocking the main thread at all (the above async code will still be run on the main thread).
Does it make sense to have a separate thread that handles background jobs? Like a simple job queue but very lightweight and does not require additional infrastructure?
Or does it make sense to create a solution like the one above?
I've seen that Starlette does something like this (https://github.com/encode/starlette/blob/decc5279335f105837987505e3e477463a996f3e/starlette/background.py#L15) but they await the background tasks AFTER a response is returned.
This makes their solution dependent on a web server design (i.e. doing things after response is sent is OK). I'm wondering if we can build something more generic where you can run background tasks in scripts or webservers alike, without sacrificing performance.
Not that familiar with async/concurrency features, so don't really know how to compare these solutions. Seems like an interesting problem!
Here is what I came up with trying to perform the tasks on another process:
class BackgroundTask(metaclass=ThreadSafeSingleton):
"""Easy way to create a background task that is not dependent on any webserver internals.
Usage:
async def sleep(t):
time.sleep(t)
BackgroundTask(sleep, 10) <- Creates async task and executes it separately (nonblocking, works with coroutines)
BackgroundTask(time.sleep, 9) <- Creates async task and executes it separately (nonblocking, works with normal functions)
BackgroundTask(es.transport.close) <- Probably most common use in our codebase
"""
background_tasks = set()
executor = concurrent.futures.ProcessPoolExecutor(max_workers=2)
lock = threading.Lock()
def __init__(self, func: typing.Callable[P, typing.Any], *args: P.args, **kwargs: P.kwargs) -> None:
"""Uses singleton instance of BackgroundTask to add a task to the async execution queue.
Args:
func (typing.Callable[P, typing.Any]): _description_
"""
self.func = func
self.args = args
self.kwargs = kwargs
self.is_async = asyncio.iscoroutinefunction(func)
async def __call__(self) -> None:
if self.is_async:
with self.lock:
loop = asyncio.get_running_loop()
with self.executor as pool:
result = await loop.run_in_executor(
pool, functools.partial(self.func, *self.args, **self.kwargs))
CodePudding user response:
You could try something like this:
import multiprocessing
class MPPool:
def __init__(self, num=multiprocessing.cpu_count() - 1):
self.pool = multiprocessing.Pool(num)
def __call__(self, f, *args, **kwargs):
self.pool.apply_async(f, args=args, kwds=kwargs)
def run_and_forget(f, *args, **kwargs):
if "pool" not in run_and_forget.__dict__:
run_and_forget.pool = MPPool()
run_and_forget.pool(f, *args, **kwargs)
if __name__ == '__main__':
import time
def test(n):
time.sleep(n)
print(f"done {n}")
for i in range(20):
run_and_forget(test, i)
print(f"passed {i}")
time.sleep(50)
print("end")
The function run_and_forget
can be used anywhere (within a single process), as the member pool
is static-like and therefore defined at the first call.
This wasn't fully tested, but I've provided some quick test code to see how it works. First things that comes to mind is that before exiting it would be smart to cleanup the multiprocessing pool.
CodePudding user response:
I'll answer "what you've asked", but I'll preface that you may be asking the wrong question due to a lack of understanding.
In Python stdlib, subprocess
can spin up separate independent processes that behave like "fire and forget". Here's a couple:
import os, subprocess
subprocess.Popen(['mkdir', 'foo'])
os.popen('touch answer_is_$((1 2))')
It'd be much better to provide concrete examples of these "small and fast non-blocking tasks" you'd like to have, complete with the environment you'll want them to be running in. You're missing some understanding that's evident b/c some of your statements conflict with others. For example, asyncio
and threading
don't operate like "fire and forget" at all.
Also, there's not going to be a good way to "background within any context" b/c the differences between different contexts matter, and "what's best" depends on many factors.