I have a program that uses many asynchronous functions that are important for its general operation, but I need one of them to execute 2 processes at the same time. For this I decided to use the multiprocess, creating and starting the process p1 and the process p2. p1 will be a kind of timer that will count 8 seconds and that will be the time limit that the process p2 will have to finish your tasks, but if p2 does not complete its tasks before 8 seconds (that is, p1 finishes before p2) then both processes are closed, and the program continues.
In this case p2 is a process that does operations with strings, and encodes the content of the variable called text, returning it to the main program after finishing. On the other hand, p1 is a process whose goal is to limit the time that process p2 has to perform string operations and load the result into the text variable.
The objective of this algorithm would be to avoid the possible case that if a user sent super long operations (or in a possible case that the system was saturated to process that information in a reasonable time) that took more than 8 seconds, then p2 would stop doing them and the content of the variable text would not change.
This is my simplified code, where I try to make 2 threads (called p1 and p2) each one executing a different function, but both functions are within the same class, and I am calling this class from an asynchronous function:
import asyncio, multiprocessing, time
#class BotBrain(BotModule):
class BotBrain:
# real __init__()
'''
def __init__(self, chatbot: object) -> None:
self.chatbot = chatbot
self.max_learn_range = 4
corpus_name = self.chatbot.config["CorpusName"]
self.train(corpus_name)
'''
# test __init__()
def __init__(self):
self.finish_state = multiprocessing.Event()
def operation_process(self, input_text, text):
#text = name_identificator(input_text, text)
text = "aaa" #for this example I change the info directly without detailing the nlp process
return input_text, text
def finish_process(self, finish_state):
time.sleep(8)
print("the process has been interrupted!")
self.finish_state.set()
def process_control(self, finish_state, input_text, text):
p1 = multiprocessing.Process(target=self.finish_process, args=(finish_state,))
p1.start()
p2 = multiprocessing.Process(target=self.operation_process, args=(input_text, text,))
p2.start()
#close all unfinished processes, p1 or p2
finish_state.wait()
for process in [p1, p2]:
process.terminate()
def process(self, **kwargs) -> str:
input_text, text = "Hello, how are you?", ""
text = self.process_control(self.finish_state, input_text, text) #for multiprocessing code
async def main_call():
testInstance = BotBrain() #here execute the instructions of the __init__() function
testInstance.process()
asyncio.run(main_call())
But it is giving me these errors, that I do not know how to solve them:
File "J:\async_and_multiprocess.py", line 46, in process_control
p1.start()
File "C:\Users\MIPC\anaconda3\lib\multiprocessing\process.py", line 121, in start
self._popen = self._Popen(self)
File "C:\Users\MIPC\anaconda3\lib\multiprocessing\context.py", line 224, in _Popen
return _default_context.get_context().Process._Popen(process_obj)
return _default_context.get_context().Process._Popen(process_obj)
File "C:\Users\MIPC\anaconda3\lib\multiprocessing\context.py", line 327, in _Popen
File "C:\Users\MIPC\anaconda3\lib\multiprocessing\context.py", line 327, in _Popen
return Popen(process_obj)
File "C:\Users\MIPC\anaconda3\lib\multiprocessing\popen_spawn_win32.py", line 45, in __init__
return Popen(process_obj)
File "C:\Users\MIPC\anaconda3\lib\multiprocessing\popen_spawn_win32.py", line 45, in __init__
prep_data = spawn.get_preparation_data(process_obj._name)
File "C:\Users\MIPC\anaconda3\lib\multiprocessing\spawn.py", line 154, in get_preparation_data
prep_data = spawn.get_preparation_data(process_obj._name)
File "C:\Users\MIPC\anaconda3\lib\multiprocessing\spawn.py", line 154, in get_preparation_data
_check_not_importing_main()
File "C:\Users\MIPC\anaconda3\lib\multiprocessing\spawn.py", line 134, in _check_not_importing_main
raise RuntimeError('''
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
_check_not_importing_main()
File "C:\Users\MIPC\anaconda3\lib\multiprocessing\spawn.py", line 134, in _check_not_importing_main
raise RuntimeError('''
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
What does freeze_support()
mean? And where in the class should I put that?
This is the flowchart of how my program should work, though it doesn't consider freeze_support() because I don't know what you mean by that.
CodePudding user response:
Multiprocessing can be tricky, in part because of Python's import system. From the programming guidelines of the multiprocessing module:
Make sure that the main module can be safely imported by a new Python interpreter without causing unintended side effects (such a starting a new process).
This is what is happening here. Importing your main module executes asyncio.run(main_call())
which in turn would spawn another subprocess and so on.
You need to guard your asyncio.run
from being exectuted on import with:
if __name__ == "__main__":
asyncio.run(main_call())
If you don't want to produce a frozen binary (e.g. with pyinstaller) you can ignore the part with the freeze_support()
.