I have a function that periodically does some stuff. The period is set to one minute hence I need to somehow limit the execution time. I have tried to use multiprocessing
, however it drastically increased the execution time (from <1 sec to 2 - 10 seconds).
Is there any kind of better approach how to configure the max time execution (example_function
in the code snippet below)?
I have tried using signal, however it did not work well together with scheduling.
I am using Python 3.9
in this project (planned update to 3.10
)
Example code:
from apscheduler.schedulers.blocking import BlockingScheduler
from multiprocessing import Process
def example_function():
...
here is some processing
...
def scheduled_function():
limit = 4
p = Process(target=example_function, name='Example Function')
p.start()
p.join(timeout=limit)
p.terminate()
scheduler = BlockingScheduler()
scheduler.add_job(scheduled_function, 'cron', minute='*/1')
scheduler.start()
Thanks
EDIT
I found example using threading
instead of multiprocessing
. It seems that execution time is better, it just needs a different approach to handle timeout/success result
from apscheduler.schedulers.blocking import BlockingScheduler
from threading import Thread
finished = False
def example_function():
...
here is some processing
...
global finished
finished = True
def scheduled_function():
limit = 4
p = Thread(target=example_function)
p.start()
p.join(timeout=limit)
global finished
if finished:
return "Finished OK"
else:
return "Timeout"
scheduler = BlockingScheduler()
scheduler.add_job(scheduled_function, 'cron', minute='*/1')
scheduler.start()
CodePudding user response:
I have tried using signal, however it did not work well together with scheduling.
I generally use signal for timeout, but since it does not work with BlockingScheduler (ValueError: signal only works in main thread
), this is a workaround to trigger scheduled_function
every x seconds:
def example_function(i, limit=4):
with timeout(limit):
try:
...
except TimeoutError:
...
def scheduled_function(cron):
with Timer() as t:
with timeout(cron - 1):
with Pool(1) as p:
p.map(partial(example_function, limit=4), range(1))
# sleep for remaining time
time.sleep(cron - t.secs)
while True:
scheduled_function(cron=10) # every 10s
Here is the complete code:
import random
import time
import signal
from functools import partial
from contextlib import contextmanager
from multiprocessing import Pool
from datetime import datetime
class Timer:
def __enter__(self):
self.start_time = time.perf_counter()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
end_time = time.perf_counter()
self.secs = end_time - self.start_time # seconds
@contextmanager
def timeout(t):
"""https://www.jujens.eu/posts/en/2018/Jun/02/python-timeout-function/"""
signal.signal(signal.SIGALRM, raise_timeout)
signal.alarm(t)
try:
yield
except TimeoutError:
pass
finally:
signal.signal(signal.SIGALRM, signal.SIG_IGN)
def raise_timeout(signum, frame):
raise TimeoutError
def example_function(i, limit=4):
with timeout(limit):
try:
a = random.uniform(1, 5)
print(f"running task {i} for {a} seconds")
time.sleep(a)
print(f"[OK] task {i} finished")
except TimeoutError:
print(f"[NOT OK] task {i} did not finish: {a}>{limit}")
print(f"timeout={limit}s has expired -> exiting task {i}")
def scheduled_function(cron):
print("starting scheduled_function at", datetime.now().strftime("%Mmin%Ss"))
with Timer() as t:
with timeout(cron - 1):
print("running small 4-secs jobs")
with Pool(1) as p:
p.map(partial(example_function, limit=4), range(1))
time.sleep(cron - t.secs)
print("1min -> exiting")
def main():
while True:
scheduled_function(cron=10) # every 10s
if __name__ == "__main__":
main()
And output:
starting scheduled_function at '10s' <-- starting every 10s
running small 4-secs jobs
running task 0 for 3.0037614230572447 seconds
[OK] task 0 finished
timeout=4s has expired -> exiting task 0 <-- inner task timeout=4s
10s -> exiting <-- outer job timeout=10s
starting scheduled_function at '20s' <-- starting every 10s
running small 4-secs jobs
running task 0 for 4.198487250169565 seconds
[NOT OK] task 0 did not finish: 4.198487250169565>4 <-- caught timeout error
timeout=4s has expired -> exiting task 0
starting scheduled_function at '30s'
running small 4-secs jobs
running task 0 for 3.489094988621927 seconds
[OK] task 0 finished
timeout=4s has expired -> exiting task 0
CodePudding user response:
If you just have a method that you need to run every 1 minute, then the sleep() function is what you need:
from time import sleep
def example_function():
"""
here is some processing
"""
for i in range(1000): # 1000 is how many times you want to repeat
example_function()
sleep(60) # 60 sec is one minute