I'm trying to use the multiprocessing module and to add 1 to the shared memory in each process. But, I see the errors when running the following code. Can anyone tell how to close and unlink the shared momery?
Here is the code.
from multiprocessing import Pool, Value, shared_memory
def counter():
existing_shm = shared_memory.SharedMemory(name = 'shm')
c = existing_shm.buf
old_state = c[0]
c[0] = old_state 1
print(c[0])
existing_shm.close()
if __name__=='__main__':
p = Pool(12)
shm = shared_memory.SharedMemory(create = True, size = 1, name= 'shm')
buffer = shm.buf
buffer[0] = 0
for i in range(20):
#p.apply(counter, args = (state, ))
p.apply(counter)
p.close()
p.join()
shm.close()
shm.unlink()
The error messages are follows.
raceback (most recent call last):
File "~/multiprocess_test.py", line 25, in <module>
p.apply(counter)
File "/usr/lib/python3.10/multiprocessing/pool.py", line 357, in apply
return self.apply_async(func, args, kwds).get()
File "/usr/lib/python3.10/multiprocessing/pool.py", line 771, in get
raise self._value
FileNotFoundError: [Errno 2] No such file or directory: '/shm'
/usr/lib/python3.10/multiprocessing/resource_tracker.py:224: UserWarning: resource_tracker: There appear to be 1 leaked shared_memory objects to clean up at shutdown
warnings.warn('resource_tracker: There appear to be %d '
/usr/lib/python3.10/multiprocessing/resource_tracker.py:237: UserWarning: resource_tracker: '/shm': [Errno 2] No such file or directory: '/shm'
warnings.warn('resource_tracker: %r: %s' % (name, e))
CodePudding user response:
I have the same issue with Windows WSL2 (debian) and I don't know why. But it seems that forcing WSL to use spawn as the start method fixes the issue. Note that I have changed the call to apply
to apply_async
so that 12 processes will run in parallel (if you have 12 or more CPU cores). This necessitates doing the incrementing under a lock since this operation is not atomic:
from multiprocessing import Pool, Value, shared_memory, set_start_method, Lock
def init_pool(the_lock):
global lock
lock = the_lock
def counter():
existing_shm = shared_memory.SharedMemory(name = 'shm')
c = existing_shm.buf
with lock:
old_state = c[0]
new_state = old_state 1
c[0] = new_state
print(new_state)
existing_shm.close()
if __name__=='__main__':
set_start_method('spawn')
lock = Lock()
p = Pool(12, initializer=init_pool, initargs=(lock,))
shm = shared_memory.SharedMemory(create = True, size = 1, name= 'shm')
buffer = shm.buf
buffer[0] = 0
for i in range(20):
#p.apply(counter, args = (state, ))
p.apply_async(counter)
p.close()
p.join()
print('final value:', buffer[0])
shm.close()
shm.unlink()
Note that for what you are doing in your example, using a multiprocessing.Value
would be simpler and will work without forcing the spawn method:
from multiprocessing import Pool, Value
def init_pool(the_value):
global v
v = the_value
def counter():
with v.get_lock():
old_state = v.value
new_state = old_state 1
v.value = new_state
print(new_state)
if __name__=='__main__':
v = Value('i', 0, lock=True)
p = Pool(12, initializer=init_pool, initargs=(v,))
for i in range(20):
p.apply_async(counter)
p.close()
p.join()
print('final value:', v.value)
CodePudding user response:
The errors you get is that as it process in the pool exists, as a result of the Pool.join
call, it finds out that the shared memory object has not been unlinked yet.
The way out of it would be to create another function in your subprocess, and use a combination of inter-process comunication primitives such as Semaphores and Barriers, so that each process would run the target counter
function,
call shm.close()
, wait for the main process to call shm.unlink()
, and only them return, allowing the process to be terminated.
(The order of the .join
call in the main process would also have to be changed).
All in all, it turns out you will be better of using concurrent.futures.ProcessPoolExecutor
which is a higher level structure to handle the life cycle of worker processes and submitted tasks. (The underlying processes, though opaque from the main Process, can use any of the tools available through the multiprocessing module).
So, once the Executor keeping the child processes alive and take care of their disposal, and handling the execution of parallel tasks properly, your code might be:
from multiprocessing import shared_memory, Lock
from concurrent.futures import ProcessPoolExecutor as Executor, as_completed
import time, random
lock = Lock()
def counter():
existing_shm = shared_memory.SharedMemory(name = 'shm')
c = existing_shm.buf
with lock:
old_state = c[0]
time.sleep(random.random()/10)
new_state = old_state 1
c[0] = new_state
print(new_state)
existing_shm.close()
if __name__=='__main__':
with Executor(12) as p:
shm = shared_memory.SharedMemory(create = True, size = 1, name= 'shm')
buffer = shm.buf
buffer[0] = 0
futures = [p.submit(counter) for i in range(20)]
for future in as_completed(futures):
pass
shm.close()
shm.unlink()