I am trying to use multiprocessing to speed up dealing with lots of files instead of reading them one by one. I did a test to learn before that. Below is my code:
from multiprocessing.pool import Pool
from time import sleep, time
def print_cube(num):
aa1 = num * num
aa2 = num * num * num
return aa1, aa2
def main1():
start = time()
x = []
y = []
p = Pool(16)
for j in range(1, 5):
results = p.apply_async(print_cube, args = (j, ))
x.append(results.get()[0])
y.append(results.get()[1])
end = time()
return end - start, x, y
def main2():
start = time()
x = []
y = []
for j in range(1, 5):
results = print_cube(j)
x.append(results[0])
y.append(results[1])
end = time()
return end - start, x, y
if __name__ == "__main__":
print("Method1{0}time : {1}{2}x : {3}{4}y : {5}".format('\n' ,main1()[0], '\n', main1()[1], '\n', main1()[2]))
print("Method2{0}time : {1:.6f}{2}x : {3}{4}y : {5}".format('\n' ,main2()[0], '\n', main2()[1], '\n', main2()[2]))
And the result is:
Method1
time : 0.1549079418182373
x : [1, 4, 9, 16]
y : [1, 8, 27, 64]
Method2
time : 0.000000
x : [1, 4, 9, 16]
y : [1, 8, 27, 64]
Method1 uses multiprocessing and consumes more CPU, but costs more time than method2.
Even if the number of cycles j goes to 5000 or greater, method2 works better than method1. Can anybody tell me what's wrong with my code?
CodePudding user response:
There is overhead in using multiprocessing that you do not otherwise have, such as (1) creating processes, (2) passing arguments to your worker function, which is running in different processes and (3) passing results back to your main process. Therefore, the worker function must be sufficiently CPU-intensive so that the gains you achieve by running it in parallel offset the additional overhead I just mentioned. Your worker function, print_cube
does not meet that criteria because it is not sufficiently CPU-intensive.
But you are not even running your worker function in parallel.
You are submitting a tasks in a loop by calling method multiprocessing.pool.Pool.apply_async
, which returns an instance of multiprocessing.pool.AsyncResult
but before you call apply_async
again to submit the next task you are calling method get
on the AsyncResult
and therefore blocking until the first task completes and returns its result before you submit the second task!!! You must submit all your tasks with apply_async
and save the returned AsyncResult
instances and only then call get
on these instances. Only then will you achieve parallelism. Even then your worker function, print_cube
, uses too little CPU to overcome the additional overhead that multiprocessing uses to be more performant than serial processing.
In the following code I have (1) corrected the multiprocessing code to perform parallelism and to create a pool size of 5 (there is no reason to create a pool with more processes than the number of tasks you will be submitting or the number of CPU processors that you have for purely CPU-bound tasks; that is just additional overhead you are creating for no good reason) and (2) modified print_cube
to be very CPU-intensive to demonstrate how multiprocessing could be advantageous (albeit in an artificial way):
from multiprocessing.pool import Pool
from time import sleep, time
def print_cube(num):
# emulate a CPU-intensive calculation:
for _ in range(10_000_000):
aa1 = num * num
aa2 = num * num * num
return aa1, aa2
def main1():
start = time()
x = []
y = []
p = Pool(5)
# Submit all the tasks and save the AsyncResult instances:
results = [p.apply_async(print_cube, args = (j, )) for j in range(1, 5)]
# Now wait for the return values:
for result in results:
# Unpack the tuple:
x_value, y_value = result.get()
x.append(x_value)
y.append(y_value)
end = time()
return end - start, x, y
def main2():
start = time()
x = []
y = []
for j in range(1, 5):
results = print_cube(j)
x.append(results[0])
y.append(results[1])
end = time()
return end - start, x, y
if __name__ == "__main__":
print("Method1{0}time : {1}{2}x : {3}{4}y : {5}".format('\n' ,main1()[0], '\n', main1()[1], '\n', main1()[2]))
print("Method2{0}time : {1:.6f}{2}x : {3}{4}y : {5}".format('\n' ,main2()[0], '\n', main2()[1], '\n', main2()[2]))
Prints:
Method1
time : 1.109999656677246
x : [1, 4, 9, 16]
y : [1, 8, 27, 64]
Method2
time : 2.827015
x : [1, 4, 9, 16]
y : [1, 8, 27, 64]
Important Note
Unless you have a solid state drive, you will probably find that trying to read in parallel multiple files may be counter-productive because of head movement back and forth. This may also be a job better-suited for multithreading.
CodePudding user response:
@Booboo First of all, thank you very much for your detailed and excellent explanation. It helps me a lot to better understand the multiprocessing tool of python and your code is also a great example. And next time when trying to apply multiprocessing, I think I'll first consider whether the task satisfies the features of multiprocessing you said. And sorry for the late reply that I ran some experiments.
Second, I ran the code you gave on my computer, and it showed similar result with yours, where Method1 did cost less time with higher CPU consumption than Method2.
Method1
time : 1.0751237869262695
x : [1, 4, 9, 16]
y : [1, 8, 27, 64]
Method2
time : 3.642306
x : [1, 4, 9, 16]
y : [1, 8, 27, 64]
Third, as for the note you wrote, the data files are stored in a solid state drive, and I tested the time and CPU consumption of dealing with about 50 * 100 MB csv files in Method1 (with multiprocessing), Method2 (nothing), and Method3 (with multithreading), respectively. Method2 did consume high percentage of GPU, 50%, but did not reach the maximum like the Method1 could. Result is as follows:
time : 12.527468204498291
time : 59.400668144226074
time : 35.45922660827637
Forth, below is the example by emulating a CPU-intensive calculation:
import threading
from multiprocessing.pool import Pool
from queue import Queue
from time import time
def print_cube(num):
# emulate a CPU-intensive calculation:
for _ in range(10_000_000_0):
aa1 = num * num
aa2 = num * num * num
return aa1, aa2
def print_cube_queue(num, q):
# emulate a CPU-intensive calculation:
for _ in range(10_000_000_0):
aa1 = num * num
aa2 = num * num * num
q.put((aa1, aa2))
def main1():
start = time()
x = []
y = []
p = Pool(8)
# Submit all the tasks and save the AsyncResult instances:
results = [p.apply_async(print_cube, args = (j, )) for j in range(1, 5)]
# Now wait for the return values:
for result in results:
# Unpack the tuple:
x_value, y_value = result.get()
x.append(x_value)
y.append(y_value)
end = time()
return end - start, x, y
def main2():
start = time()
x = []
y = []
for j in range(1, 5):
results = print_cube(j)
x.append(results[0])
y.append(results[1])
end = time()
return end - start, x, y
def main3():
start = time()
q = Queue()
x = []
y = []
threads = []
for j in range(1, 5):
t = threading.Thread(target=print_cube_queue, args = (j, q))
t.start()
threads.append(t)
for thread in threads:
thread.join()
results = []
for thread in threads:
x_value, y_value = q.get()
x.append(x_value)
y.append(y_value) #q.get()按顺序从q中拿出一个值
end = time()
return end - start, x, y
if __name__ == "__main__":
print("Method1{0}time : {1}{2}x : {3}{4}y : {5}".format('\n' ,main1()[0], '\n', main1()[1], '\n', main1()[2]))
print("Method2{0}time : {1:.6f}{2}x : {3}{4}y : {5}".format('\n' ,main2()[0], '\n', main2()[1], '\n', main2()[2]))
print("Method3{0}time : {1:.6f}{2}x : {3}{4}y : {5}".format('\n' ,main3()[0], '\n', main3()[1], '\n', main3()[2]))
And the result is:
Method1
time : 9.838010549545288
x : [1, 4, 9, 16]
y : [1, 8, 27, 64]
Method2
time : 35.850124
x : [1, 4, 9, 16]
y : [1, 8, 27, 64]
Method3
time : 37.191602
x : [4, 16, 9, 1]
y : [8, 1, 64, 27]
I did some search, and don't know whether it is because the GPL or someting else.