Home > Software engineering >  ProcessPoolExecutor suddenly stops at 255th process
ProcessPoolExecutor suddenly stops at 255th process

Time:09-01

I'm trying to use concurrent.futures and to increment the process number which is allocated in the shared_memory. (How to properly close and unlink shared memory of multiprocessing?)

But, the incrementation suddenly stops at state = 255 in the following code. I want to increment the number up to about 500 for my own reason.

Can someone solve the problem?

from multiprocessing import shared_memory, Lock
from concurrent.futures import ProcessPoolExecutor as Executor, as_completed
import time, random

lock = Lock()

def counter():
    existing_shm = shared_memory.SharedMemory(name = 'shm')
    c = existing_shm.buf
    with lock:
        old_state = c[0]
        time.sleep(random.random()/10)
        new_state = old_state   1
        c[0] = new_state
    print(new_state)
    existing_shm.close()


if __name__=='__main__':
    with Executor(12) as p:
        shm = shared_memory.SharedMemory(create = True, size = 1, name= 'shm')
        buffer = shm.buf
        buffer[0] = 0

        futures = [p.submit(counter) for i in range(500)]
        for future in as_completed(futures):
            pass

        shm.close()
        shm.unlink()

The output is follows.

1
2
3
*snip*
253
254
255

CodePudding user response:

Your shared memory is only 1 byte in size. Once your worker function, counter, attempts to store a value greater than 255, the maximum value a single byte can hold, counter will raise the following exception:

ValueError: memoryview: invalid value for format 'B'

Unfortunately, you had no code to catch this and so this exception went undetected. Had you called future.result(), that would have been another story.

As I recommended in an earlier post of yours, you could use a multiprocessing.Value instance, which also uses shared memory. For example:

from multiprocessing import Value
from concurrent.futures import ProcessPoolExecutor as Executor
import time, random

def init_pool_processes(value):
    global v
    v = value

def counter():
    with v.get_lock():
        old_state = v.value
        time.sleep(random.random()/10)
        new_state = old_state   1
        v.value = new_state
    print(new_state)


if __name__=='__main__':
    v = Value('i', 0, lock=True)
    with Executor(12, initializer=init_pool_processes, initargs=(v,)) as p:
        for _ in range(500):
            p.submit(counter)
    # There is an implicit call here to p.shutdown(True), which
    # will wait for all tasks to complete
  • Related