Home > Enterprise >  Multithreading / Multiprocessing solution using concurrent.futures
Multithreading / Multiprocessing solution using concurrent.futures

Time:09-14

Hi I'm referencing the following question because it's similar to what I'm trying to achieve, however, I'm getting an error that I can't seem to figure out so looking for some help

Combining multithreading and multiprocessing with concurrent.futures

Here's my test code:

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import numpy as np
from os import cpu_count
from functools import partial

num_list = range(0,1000)
  
def test(x):
    x**2
             
def multithread(f, lst):
    print('Thread running')
    with ThreadPoolExecutor() as thread_executor:
        thread_executor.map(f, lst)

def multiprocesser(lst, f, n_processors=cpu_count()//2):
    chunks = np.array_split(lst, n_processors)
    with ProcessPoolExecutor(max_workers=n_processors) as mp_executor:
        mp_executor.map(partial(multithread, f), chunks)

if __name__ == '__main__':
    multiprocesser(num_list, test)
Process SpawnProcess-31:
Traceback (most recent call last):
  File "C:\Users\Test_user\Anaconda3\envs\test_env\lib\multiprocessing\process.py", line 315, in _bootstrap
    self.run()
  File "C:\Users\Test_user\Anaconda3\envs\test_env\lib\multiprocessing\process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\Test_user\Anaconda3\envs\test_env\lib\concurrent\futures\process.py", line 237, in _process_worker
    call_item = call_queue.get(block=True)
  File "C:\Users\Test_user\Anaconda3\envs\test_env\lib\multiprocessing\queues.py", line 122, in get
    return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'multithread' on <module '__main__' (built-in)>
Process SpawnProcess-32:
Traceback (most recent call last):
  File "C:\Users\Test_user\Anaconda3\envs\test_env\lib\multiprocessing\process.py", line 315, in _bootstrap
    self.run()
  File "C:\Users\Test_user\Anaconda3\envs\test_env\lib\multiprocessing\process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\Test_user\Anaconda3\envs\test_env\lib\concurrent\futures\process.py", line 237, in _process_worker
    call_item = call_queue.get(block=True)
  File "C:\Users\Test_user\Anaconda3\envs\test_env\lib\multiprocessing\queues.py", line 122, in get
    return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'multithread' on <module '__main__' (built-in)>

So I didn't specify number of threads (don't see a reason to for the threadpool executor). Having trouble understanding what the error actually means and how I can fix it. Any help would be appreciated.

CodePudding user response:

The error probably stems from the fact that multithread() is being called incorrectly.

Try this:

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import numpy as np
from os import cpu_count
from functools import partial

num_list = range(0,1000)

def test(x):
    x**2
               
def multithread(f, lst):
    print('Thread running')
    with ThreadPoolExecutor() as thread_executor:
        thread_executor.map(f, lst)

def multiprocesser(lst, f, n_processors=cpu_count()//2):
    chunks = np.array_split(lst, n_processors)
    with ProcessPoolExecutor(max_workers=n_processors) as mp_executor:
        mp_executor.map(partial(multithread, f), chunks)

if __name__ == '__main__':
    multiprocesser(num_list, test)

CodePudding user response:

Missing if __name__ == '__main__'

if __name__ == '__main__':
    multiprocesser(num_list, test)

Unintended recursion

When you don't block out the call to multiprocessor(), you have recursion when the subprocess runs the python script.

Safe importing of main module

The following is an example of the same type of problem from the multiprocessing docs:

https://docs.python.org/3/library/multiprocessing.html?highlight=multiprocess#the-spawn-and-forkserver-start-methods

Make sure that the main module can be safely imported by a new Python interpreter without causing unintended side effects (such a starting a new process).

For example, using the spawn or forkserver start method running the following module would fail with a RuntimeError:

multiprocessing import Process

def foo():
    print('hello')

p = Process(target=foo) p.start()

Instead one should protect the “entry point” of the program by using if __name__ == '__main__': as follows:

from multiprocessing import Process, freeze_support, set_start_method

def foo():
    print('hello')

if __name__ == '__main__':
    freeze_support()
    set_start_method('spawn')
    p = Process(target=foo)
    p.start() ```
  • Related