I have my main function run_tests
which firstly starts new, separate Thread that starts new processes and then in main loop I try to detect those that have finished and those that have timeouted.
import time
import traceback
from typing import List
from threading import Thread
from multiprocess import (Semaphore,
Process,
Pipe)
from hanging_threads import start_monitoring
class S_Process(Process):
def __init__(self,
test_name: str,
semaphore: Semaphore,
pipe_conn: Pipe,
*args,
**kwargs
) -> None:
Process.__init__(self, *args, **kwargs)
self.__child_conn = pipe_conn
self.__test_name = test_name
self.__semaphore = semaphore
def run(self) -> None:
self.__semaphore.acquire()
self.__child_conn.send(0)
Process.run(self)
self.__child_conn.send(self.__test_name)
self.__semaphore.release()
def terminate(self) -> None:
self.__semaphore.release()
super().terminate()
@property
def test_name(self) -> str:
return self.__test_name
class Task(object):
def __init__(self,
process: S_Process,
pipe_conn: Pipe,
) -> None:
self.process = process
self.pipe_conn = pipe_conn
self.duration = None
self.test_name = None
self.status = 'NOTRUN'
def run(self) -> None:
self.process.start()
self.pipe_conn.recv()
self.duration = time.perf_counter()
self.status = 'RUNNING'
def join(self) -> None:
self.process.join()
if self.process.is_alive():
self.process.kill()
self.set_result()
def terminate(self) -> None:
self.process.terminate()
def set_result(self) -> None:
self.test_name = self.pipe_conn.recv()
self.status = 'ENDED'
class Tasks(object):
def __init__(self) -> None:
self.remaining: List[Task] = []
self.completed: List[Task] = []
def add(self,
process: S_Process,
pipe_conn: Pipe
) -> None:
task = Task(process, pipe_conn)
self.remaining.append(task)
def complete(self, task: Task) -> None:
self.completed.append(task)
self.remaining.remove(task)
def info(self) -> List[str]:
output: List[str] = []
for task in self.completed:
output.append(f"Test Name: {task.result.test_name} "
f"Result: {task.result.status} "
f"Duration: {task.result.duration} "
f"Retries: {task.result.retries}")
return output
def run_tests() -> None:
start_monitoring()
tasks = Tasks()
semaphore = Semaphore(2)
for i in range(8):
parent_conn, child_conn = Pipe()
process = S_Process(
target=test_function,
args=(),
test_name=f'test_{i}',
semaphore=semaphore,
pipe_conn=child_conn
)
tasks.add(process, parent_conn)
def runner(tasks):
try:
for task in tasks:
print('running task')
task.run()
except Exception:
print(traceback.format_exc())
TIMEOUT = 5
runner = Thread(target=runner, args=(tasks.remaining,))
runner.start()
while tasks.remaining:
for task in tasks.remaining:
if not task.process.is_alive() and task.status == 'RUNNING':
print('JOINING:', task.process.test_name)
task.join()
tasks.complete(task)
if task.status == "RUNNING":
check_time = time.perf_counter() - task.duration
if (check_time > TIMEOUT):
print('TERMINATING:', task.process.test_name)
task.terminate()
tasks.complete(task)
print('Rem:', len(tasks.remaining))
print('End:', len(tasks.completed))
time.sleep(0.2)
def test_function():
print('test_func')
time.sleep(3)
if __name__ == "__main__":
run_tests()
The method task.run() starts the process and waits for pipe_conn.recv() to get info that process has indeed acquired semaphore and started working so I can measure its time duration.
When I set sempaphore i.e. to value "2" (max 2 processes can run simultaneously) with 7-8 tasks and start run_tests it goes well until third/fourth process is being joined/terminated. Thanks to hanging_threads package I discovered that my runner thread dies with this error:
---------- Thread 9068 "Thread-2 (runner)" hangs ----------
File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.10_3.10.2032.0_x64__qbz5n2kfra8p0\lib\threading.py", line 973, in _bootstrap
self._bootstrap_inner()
File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.10_3.10.2032.0_x64__qbz5n2kfra8p0\lib\threading.py", line 1016, in _bootstrap_inner
self.run()
File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.10_3.10.2032.0_x64__qbz5n2kfra8p0\lib\threading.py", line 953, in run
self._target(*self._args, **self._kwargs)
File "c:\Users\<user>\Documents\Projects\eightest\eightest\runner.py", line 261, in runner
task.run()
File "c:\Users\<user>\Documents\Projects\eightest\eightest\runner.py", line 64, in run
if self.pipe_conn.recv() == 'started':
File "C:\Users\<user>\Documents\Projects\eightest\.venv\lib\site-packages\multiprocess\connection.py", line 258, in recv
buf = self._recv_bytes()
File "C:\Users\<user>\Documents\Projects\eightest\.venv\lib\site-packages\multiprocess\connection.py", line 313, in _recv_bytes
waitres = _winapi.WaitForMultipleObjects(
Why first few processes start and end well and then at some point thread is not able to handle the Pipe? Also the loop hangs on 6 tasks ended and 2 remaining and those 2 can never be started.
CodePudding user response:
I spent quite a while on this trying to figure it out. The problem is that your task runner thread has:
for task in tasks:
print('running task')
task.run()
where tasks
is a reference to tasks.remaining
. The problem arises from trying to iterate what is essentially the tasks.remaining
list while the main thread is removing tasks from this very same list. As a result (on my desktop) two tasks never get iterated and thus never started. The solution is for the task runner thread to iterate a copy of the tasks.remaining
list.
I have made other changes to your code. All of my modifications are commented with #Booboo. Also, I do not have a hanging_threads
module and so I commented out the monitor-related code:
import time
import traceback
from typing import List
from threading import Thread
from multiprocess import (Semaphore,
Process,
Pipe)
#from hanging_threads import start_monitoring #Booboo
class S_Process(Process):
def __init__(self,
test_name: str,
semaphore: Semaphore,
pipe_conn: Pipe,
*args,
**kwargs
) -> None:
Process.__init__(self, *args, **kwargs)
self.__child_conn = pipe_conn
self.__test_name = test_name
self.__semaphore = semaphore
def run(self) -> None:
with self.__semaphore: #Booboo
"""
"""
self.__child_conn.send(0)
#Booboo we must catch any possible exceptions raise by the
# target function to ensure we do the send below:
try:
Process.run(self)
except Exception as e:
print(e)
self.__child_conn.send(self.__test_name)
"""
def terminate(self) -> None:
self.__semaphore.release()
super().terminate()
"""
@property
def test_name(self) -> str:
return self.__test_name
class Task(object):
def __init__(self,
process: S_Process,
pipe_conn: Pipe,
) -> None:
self.process = process
self.pipe_conn = pipe_conn
self.duration = None
self.test_name = None
self.status = 'NOTRUN'
def run(self) -> None:
self.process.start()
self.pipe_conn.recv()
self.duration = time.perf_counter()
self.status = 'RUNNING'
def join(self) -> None:
self.process.join()
#Booboo This method is only called if the process is not alive:
#assert not self.process.is_alive()
""" #Booboo
if self.process.is_alive(): # The process cannot be alive
self.process.kill()
"""
self.set_result()
def terminate(self) -> None:
self.process.terminate()
def set_result(self) -> None:
self.test_name = self.pipe_conn.recv()
self.duration = time.perf_counter() - self.duration #Booboo
self.status = 'ENDED'
class Tasks(object):
def __init__(self) -> None:
self.remaining: List[Task] = []
self.completed: List[Task] = []
def add(self,
process: S_Process,
pipe_conn: Pipe,
) -> None:
task = Task(process, pipe_conn)
self.remaining.append(task)
def complete(self, task: Task) -> None:
self.completed.append(task)
self.remaining.remove(task)
def info(self) -> List[str]:
output: List[str] = []
for task in self.completed:
output.append(f"Test Name: {task.test_name} " #Booboo
f"Result: {task.status} " #Booboo
f"Duration: {task.duration}") # #Booboo
#f"Retries: {task.result.retries}") #Booboo
return output
def run_tests() -> None:
#start_monitoring() #Booboo
tasks = Tasks()
semaphore = Semaphore(2)
for i in range(8):
parent_conn, child_conn = Pipe()
process = S_Process(
target=test_function,
args=(),
test_name=f'test_{i}',
semaphore=semaphore,
pipe_conn=child_conn
)
tasks.add(process, parent_conn)
def runner(tasks):
try:
for task in tasks:
task.run()
print('running task', task.process.test_name) #Booboo
except Exception:
print(traceback.format_exc())
TIMEOUT = 5
runner = Thread(target=runner, args=(tasks.remaining.copy(),)) #Booboo
runner.start()
while tasks.remaining:
for task in tasks.remaining.copy():
if not task.process.is_alive() and task.status == 'RUNNING':
print('JOINING:', task.process.test_name)
task.join()
tasks.complete(task)
elif task.status == "RUNNING": #Booboo
check_time = time.perf_counter() - task.duration
if (check_time > TIMEOUT):
print('TERMINATING:', task.process.test_name)
task.terminate()
tasks.complete(task)
print('Rem:', len(tasks.remaining))
print('End:', len(tasks.completed))
time.sleep(0.2)
def test_function():
print('test_func')
time.sleep(3)
if __name__ == "__main__":
run_tests()