I'm trying to use concurrent.futures and to increment the process number which is allocated in the shared_memory. (How to properly close and unlink shared memory of multiprocessing?)
But, the incrementation suddenly stops at state = 255
in the following code. I want to increment the number up to about 500 for my own reason.
Can someone solve the problem?
from multiprocessing import shared_memory, Lock
from concurrent.futures import ProcessPoolExecutor as Executor, as_completed
import time, random
lock = Lock()
def counter():
existing_shm = shared_memory.SharedMemory(name = 'shm')
c = existing_shm.buf
with lock:
old_state = c[0]
time.sleep(random.random()/10)
new_state = old_state 1
c[0] = new_state
print(new_state)
existing_shm.close()
if __name__=='__main__':
with Executor(12) as p:
shm = shared_memory.SharedMemory(create = True, size = 1, name= 'shm')
buffer = shm.buf
buffer[0] = 0
futures = [p.submit(counter) for i in range(500)]
for future in as_completed(futures):
pass
shm.close()
shm.unlink()
The output is follows.
1
2
3
*snip*
253
254
255
CodePudding user response:
Your shared memory is only 1 byte in size. Once your worker function, counter
, attempts to store a value greater than 255, the maximum value a single byte can hold, counter
will raise the following exception:
ValueError: memoryview: invalid value for format 'B'
Unfortunately, you had no code to catch this and so this exception went undetected. Had you called future.result()
, that would have been another story.
As I recommended in an earlier post of yours, you could use a multiprocessing.Value
instance, which also uses shared memory. For example:
from multiprocessing import Value
from concurrent.futures import ProcessPoolExecutor as Executor
import time, random
def init_pool_processes(value):
global v
v = value
def counter():
with v.get_lock():
old_state = v.value
time.sleep(random.random()/10)
new_state = old_state 1
v.value = new_state
print(new_state)
if __name__=='__main__':
v = Value('i', 0, lock=True)
with Executor(12, initializer=init_pool_processes, initargs=(v,)) as p:
for _ in range(500):
p.submit(counter)
# There is an implicit call here to p.shutdown(True), which
# will wait for all tasks to complete