Home > Software design >  Most efficient way how to limit function execution in Python
Most efficient way how to limit function execution in Python

Time:09-29

I have a function that periodically does some stuff. The period is set to one minute hence I need to somehow limit the execution time. I have tried to use multiprocessing, however it drastically increased the execution time (from <1 sec to 2 - 10 seconds).

Is there any kind of better approach how to configure the max time execution (example_function in the code snippet below)?

I have tried using signal, however it did not work well together with scheduling.

I am using Python 3.9 in this project (planned update to 3.10)

Example code:

from apscheduler.schedulers.blocking import BlockingScheduler
from multiprocessing import Process


def example_function():
  ...
  here is some processing
  ...


def scheduled_function():
  limit = 4
  p = Process(target=example_function, name='Example Function')

  p.start()
  p.join(timeout=limit)
  p.terminate()


scheduler = BlockingScheduler()
scheduler.add_job(scheduled_function, 'cron', minute='*/1')
scheduler.start()

Thanks


EDIT

I found example using threading instead of multiprocessing. It seems that execution time is better, it just needs a different approach to handle timeout/success result

from apscheduler.schedulers.blocking import BlockingScheduler
from threading import Thread


finished = False


def example_function():
  ...
  here is some processing
  ...
  global finished
  finished = True


def scheduled_function():
  limit = 4
  p = Thread(target=example_function)

  p.start()
  p.join(timeout=limit)

  global finished
  if finished:
    return "Finished OK"
  else:
    return "Timeout"


scheduler = BlockingScheduler()
scheduler.add_job(scheduled_function, 'cron', minute='*/1')
scheduler.start()

CodePudding user response:

I have tried using signal, however it did not work well together with scheduling.

I generally use signal for timeout, but since it does not work with BlockingScheduler (ValueError: signal only works in main thread), this is a workaround to trigger scheduled_function every x seconds:

def example_function(i, limit=4):
    with timeout(limit):
        try:
            ...
        except TimeoutError:
            ...

def scheduled_function(cron):
    with Timer() as t:
        with timeout(cron - 1):
            with Pool(1) as p:
                p.map(partial(example_function, limit=4), range(1))
    # sleep for remaining time
    time.sleep(cron - t.secs)

while True:
   scheduled_function(cron=10)  # every 10s

Here is the complete code:

import random
import time
import signal
from functools import partial
from contextlib import contextmanager
from multiprocessing import Pool
from datetime import datetime


class Timer:
    def __enter__(self):
        self.start_time = time.perf_counter()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        end_time = time.perf_counter()
        self.secs = end_time - self.start_time  # seconds


@contextmanager
def timeout(t):
    """https://www.jujens.eu/posts/en/2018/Jun/02/python-timeout-function/"""
    signal.signal(signal.SIGALRM, raise_timeout)
    signal.alarm(t)

    try:
        yield
    except TimeoutError:
        pass
    finally:
        signal.signal(signal.SIGALRM, signal.SIG_IGN)


def raise_timeout(signum, frame):
    raise TimeoutError


def example_function(i, limit=4):
    with timeout(limit):
        try:
            a = random.uniform(1, 5)
            print(f"running task {i} for {a} seconds")
            time.sleep(a)
            print(f"[OK] task {i} finished")
        except TimeoutError:
            print(f"[NOT OK] task {i} did not finish: {a}>{limit}")

    print(f"timeout={limit}s has expired -> exiting task {i}")


def scheduled_function(cron):
    print("starting scheduled_function at", datetime.now().strftime("%Mmin%Ss"))
    with Timer() as t:
        with timeout(cron - 1):
            print("running small 4-secs jobs")
            with Pool(1) as p:
                p.map(partial(example_function, limit=4), range(1))
    time.sleep(cron - t.secs)
    print("1min -> exiting")


def main():
    while True:
        scheduled_function(cron=10)  # every 10s


if __name__ == "__main__":
    main()

And output:

starting scheduled_function at '10s'                   <-- starting every 10s
running small 4-secs jobs
running task 0 for 3.0037614230572447 seconds
[OK] task 0 finished
timeout=4s has expired -> exiting task 0                 <-- inner task timeout=4s
10s -> exiting                                           <-- outer job timeout=10s

starting scheduled_function at '20s'                   <-- starting every 10s
running small 4-secs jobs
running task 0 for 4.198487250169565 seconds
[NOT OK] task 0 did not finish: 4.198487250169565>4       <-- caught timeout error
timeout=4s has expired -> exiting task 0

starting scheduled_function at '30s'
running small 4-secs jobs
running task 0 for 3.489094988621927 seconds
[OK] task 0 finished
timeout=4s has expired -> exiting task 0

CodePudding user response:

If you just have a method that you need to run every 1 minute, then the sleep() function is what you need:

from time import sleep

def example_function():
  """
  here is some processing
  """

for i in range(1000):  # 1000 is how many times you want to repeat
  example_function()
  sleep(60)  # 60 sec is one minute
  • Related