Home > Blockchain >  Python multiprocessing - number of threadpools not multiplied by number of pools
Python multiprocessing - number of threadpools not multiplied by number of pools

Time:03-21

I have a python 3.9 code that uses Pool and ThreadPool from multiprocessing.pool. The intention is to have 2 Pools, each spawning 3 ThreadPools independently. In other words, I expect 2*3 = 6 threads running in parallel.

However, the output of the minimum working example (MWE) code below resulted in only 3 different thread ids.

My question: Why does it behave like this, and how can I reasonably fix that?

In addition, if such a N_POOL * N_THREADPOOL strategy does not look good, advice is welcomed. The actual task is I/O-bound (network download followed by light preprocessing). I am relatively new to parallelism.

MWE Code

from multiprocessing.pool import Pool, ThreadPool
from multiprocessing import Queue
from threading import get_ident
import random
from time import sleep
from functools import partial

# Params
N = 12
N_CPU = 2
N_THREAD = 3
# Just for CPU numbering
CPU_QUEUE = Queue(N_CPU)
for i in range(1, 1   N_CPU):
    CPU_QUEUE.put(i)


def split_list_to_pools(ls_data, n_pools):
    """Split data into pools as lists of approx. equal lengths."""
    n_each = int((len(ls_data) - 1) / n_pools)   1
    return [ls_data[n_each * i:n_each * (i   1)] for i in range(n_pools)]


def process_threadpool(data, CPU_NO=-1):
    """Process incoming data one-by-one"""
    sleep(3   random.random())
    print(f"Threadpool id: {get_ident()} CPU_NO: {CPU_NO} / {N_CPU}, data: {data}")


def process_pool(ls_data):
    """Process a list of data."""
    # Get initial pool status
    CPU_NO = CPU_QUEUE.get()
    print(f"Pool CPU_NO: {CPU_NO}, data: {ls_data}")

    with ThreadPool(N_THREAD) as threadpool:
        for _ in threadpool.imap_unordered(partial(process_threadpool, CPU_NO=CPU_NO), ls_data):
            pass


if __name__ == '__main__':

    # given data
    ls_data = list(range(N))

    # split data to pools
    ls_ls_data = split_list_to_pools(ls_data, N_CPU)
    print(f"data rearranged for pool: {ls_ls_data}")

    # process in parallel
    with Pool(N_CPU) as pool:
        for _ in pool.imap_unordered(process_pool, ls_ls_data):
            pass

    print("Program Ended!")

Output

Only 3 distinct thread ids present instead of the expected 6.

$ python so.py
data rearranged for pool: [[0, 1, 2, 3, 4, 5], [6, 7, 8, 9, 10, 11]]
Pool CPU_NO: 1, data: [0, 1, 2, 3, 4, 5]
Pool CPU_NO: 2, data: [6, 7, 8, 9, 10, 11]
Threadpool id: 140065165276928 CPU_NO: 1 / 2, data: 2
Threadpool id: 140065165276928 CPU_NO: 2 / 2, data: 8
Threadpool id: 140065182062336 CPU_NO: 2 / 2, data: 6
Threadpool id: 140065182062336 CPU_NO: 1 / 2, data: 0
Threadpool id: 140065173669632 CPU_NO: 2 / 2, data: 7
Threadpool id: 140065173669632 CPU_NO: 1 / 2, data: 1
Threadpool id: 140065165276928 CPU_NO: 1 / 2, data: 3
Threadpool id: 140065182062336 CPU_NO: 2 / 2, data: 10
Threadpool id: 140065182062336 CPU_NO: 1 / 2, data: 4
Threadpool id: 140065165276928 CPU_NO: 2 / 2, data: 9
Threadpool id: 140065173669632 CPU_NO: 1 / 2, data: 5
Threadpool id: 140065173669632 CPU_NO: 2 / 2, data: 11
Program Ended!

Edit: The code was run under debian 11

CodePudding user response:

You didn't specify what platform you are running under but I must assume it is one that uses fork to create new processes (such as Linux) or I don't believe your code would work correctly because under spawn each process in the pool would be creating its own copy of global CPU_QUEUE and thus would each be getting the first item on the queue and believe that it is CPU id 1.

I have consequently made two changes to the code:

  1. Made the code more portable between platforms by using a pool initializer to initialize global variable CPU_QUEUE for each process in the pool with a single queue instance.
  2. Introduced a call to time.sleep at the start of function process_pool to give each process in the pool a chance to process one of the submitted tasks. Without this it could theoretically be possible for one process in the pool to process all the submitted tasks and this just make that less likely.

When I run the code under Linux I essentially see what you see. However, when I run this under Windows, which I am now able to do because of the above changes, I see:

data rearranged for pool: [[0, 1, 2, 3, 4, 5], [6, 7, 8, 9, 10, 11]]
Pool CPU_NO: 1, data: [6, 7, 8, 9, 10, 11]
Pool CPU_NO: 2, data: [0, 1, 2, 3, 4, 5]
Threadpool id: 16924 CPU_NO: 1 / 2, data: 8
Threadpool id: 15260 CPU_NO: 1 / 2, data: 6
Threadpool id: 19800 CPU_NO: 2 / 2, data: 1
Threadpool id: 7580 CPU_NO: 2 / 2, data: 2
Threadpool id: 20368 CPU_NO: 1 / 2, data: 7
Threadpool id: 18736 CPU_NO: 2 / 2, data: 0
Threadpool id: 19800 CPU_NO: 2 / 2, data: 3
Threadpool id: 16924 CPU_NO: 1 / 2, data: 9
Threadpool id: 7580 CPU_NO: 2 / 2, data: 4
Threadpool id: 15260 CPU_NO: 1 / 2, data: 10
Threadpool id: 18736 CPU_NO: 2 / 2, data: 5
Threadpool id: 20368 CPU_NO: 1 / 2, data: 11
Program Ended!

This is what you expected to see. I can only conclude that under Linux threading.get_ident only returns a unique value within a process. However, if you use instead _thread.get_native_id(), which I have incorporated into the source below, that does seem to give 6 unique values (as hoped for):

data rearranged for pool: [[0, 1, 2, 3, 4, 5], [6, 7, 8, 9, 10, 11]]
Pool CPU_NO: 1, data: [0, 1, 2, 3, 4, 5]
Pool CPU_NO: 2, data: [6, 7, 8, 9, 10, 11]
Threadpool id: 81 CPU_NO: 2 / 2, data: 7
Threadpool id: 83 CPU_NO: 2 / 2, data: 8
Threadpool id: 78 CPU_NO: 1 / 2, data: 0
Threadpool id: 79 CPU_NO: 2 / 2, data: 6
Threadpool id: 80 CPU_NO: 1 / 2, data: 1
Threadpool id: 82 CPU_NO: 1 / 2, data: 2
Threadpool id: 78 CPU_NO: 1 / 2, data: 3
Threadpool id: 83 CPU_NO: 2 / 2, data: 10
Threadpool id: 81 CPU_NO: 2 / 2, data: 9
Threadpool id: 79 CPU_NO: 2 / 2, data: 11
Threadpool id: 80 CPU_NO: 1 / 2, data: 4
Threadpool id: 82 CPU_NO: 1 / 2, data: 5
Program Ended!

The Revised Source

from multiprocessing.pool import Pool, ThreadPool
from multiprocessing import Queue
#from threading import get_ident
from threading import get_native_id
import random
from time import sleep
from functools import partial

# Params
N = 12
N_CPU = 2
N_THREAD = 3

def init_pool_processes(the_queue):
    global CPU_QUEUE
    CPU_QUEUE = the_queue

def split_list_to_pools(ls_data, n_pools):
    """Split data into pools as lists of approx. equal lengths."""
    n_each = int((len(ls_data) - 1) / n_pools)   1
    return [ls_data[n_each * i:n_each * (i   1)] for i in range(n_pools)]


def process_threadpool(data, CPU_NO=-1):
    """Process incoming data one-by-one"""
    sleep(3   random.random())
    print(f"Threadpool id: {get_native_id()} CPU_NO: {CPU_NO} / {N_CPU}, data: {data}")


def process_pool(ls_data):
    """Process a list of data."""
    # Get initial pool status
    sleep(.2)
    CPU_NO = CPU_QUEUE.get()
    print(f"Pool CPU_NO: {CPU_NO}, data: {ls_data}")

    with ThreadPool(N_THREAD) as threadpool:
        for _ in threadpool.imap_unordered(partial(process_threadpool, CPU_NO=CPU_NO), ls_data):
            pass


if __name__ == '__main__':
    # Just for CPU numbering
    CPU_QUEUE = Queue(N_CPU)
    for i in range(1, 1   N_CPU):
        CPU_QUEUE.put(i)

    # given data
    ls_data = list(range(N))

    # split data to pools
    ls_ls_data = split_list_to_pools(ls_data, N_CPU)
    print(f"data rearranged for pool: {ls_ls_data}")

    # process in parallel
    with Pool(N_CPU, initializer=init_pool_processes, initargs=(CPU_QUEUE,)) as pool:
        for _ in pool.imap_unordered(process_pool, ls_ls_data):
            pass

    print("Program Ended!")
  • Related