No multiprocessing code:
from time import time
func1Results = []
def func1(valList):
num = 0
for val in valList:
num = val
func1Results.append(num)
if __name__ == '__main__':
st = time()
for valList in [range(40000000), range(40000000), range(40000000), range(40000000)]:
func1(valList)
ed = time()
for r in func1Results:
print(r)
print(ed - st)
Output:
799999980000000
799999980000000
799999980000000
799999980000000
13.679119348526001
Multiprocess code:
from multiprocessing import Process, Queue
from time import time
queue = Queue()
processList, func1Results = [], []
def func1(valList, queue):
num = 0
for val in valList:
num = val
queue.put(num)
if __name__ == '__main__':
st = time()
for valList in [range(40000000), range(40000000), range(40000000), range(40000000)]:
xProcess = Process(target=func1, args=(valList, queue))
xProcess.start()
func1Results.append(queue.get()), processList.append(xProcess)
for xProcess in processList:
xProcess.join()
ed = time()
for i in func1Results:
print(i)
print(ed - st)
Output:
799999980000000
799999980000000
799999980000000
799999980000000
13.916456937789917
When I use the 'Put' and 'Get' commands, the processing time of the multiprocessing code increases significantly. I know that returning results in multiprocessing is quite time consuming. But this is exactly what I need. What can I do to return the result more efficiently?
CodePudding user response:
Here's a restructured approach to the original code where we allow all the sub-processes to terminate before we examine the queue.
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Manager
from functools import partial
import time
N = 40000000
def calc(q, rng):
num = 0
for n in rng:
num = n
q.put(num)
def main():
with Manager() as manager:
queue = manager.Queue()
rlist = [range(N), range(N), range(N), range(N)]
p = partial(calc, queue)
with ProcessPoolExecutor() as executor:
executor.map(p, rlist)
while not queue.empty():
print(queue.get())
if __name__ == '__main__':
start = time.perf_counter()
main()
end = time.perf_counter()
print(f'Duration = {end-start:.2f}s')
Output:
799999980000000
799999980000000
799999980000000
799999980000000
Duration = 1.93s
Note:
Of course, you don't need a queue to get the results from the sub-process. You could just do this:
from concurrent.futures import ProcessPoolExecutor
import time
N = 40000000
def calc(rng):
num = 0
for n in rng:
num = n
return num
def main():
rlist = [range(N), range(N), range(N), range(N)]
with ProcessPoolExecutor() as executor:
print(*executor.map(calc, rlist), sep='\n')
if __name__ == '__main__':
start = time.perf_counter()
main()
end = time.perf_counter()
print(f'Duration = {end-start:.2f}s')
Output:
799999980000000
799999980000000
799999980000000
799999980000000
Duration = 1.83s