To clarify first of all, I'm NOT asking why map in multiprocessing is slow.
I had code working just fine using pool.map()
. But, in developing it (and to make it more generic), I needed to use pool.starmap()
to pass 2 arguments instead of one.
I'm still fairly new to Python and multiprocessing, so I'm not sure if I'm doing something obviously wrong here. I also couldn't find anything on this that's been previously asked, so apologies if this has already been answered.
Using python 3.10 by the way.
I'm processing just under 5 million items in a list, and managed to get a result in just over 12 minutes (instead of a predicted 6 1/2 days if it was run iteratively!) when using pool.map()
.
I'm essentially obtaining the intersection of List_A
and List_B
, but I need to preserve the frequencies of each item, and so have to do it in O(n^m).
But now, I'm getting significantly longer times when using pool.starmap()
.
I can't seem to work out why, and if anyone can give me some indiciation it would be greatly appreciated!
Here is the code for pool.map()
that works quickly as expected (where List_B
is actually part of the listCompare
function:
def listCompare(list_A):
toReturn = []
for item in list_A:
if item in list_B:
toReturn.append(item)
return toReturn
out = []
chnks = chunks(list_a, multiprocessing.cpu_count())
with multiprocessing.Pool() as pool:
for result in pool.map(listCompare, chnks):
out.extend(result)
print("Parallel:", out)
Here is the code for pool.starmap()
that works slowly. listCompare
is modified to take 2 arguments here:
(I can't use my chunks
method here, as I can't pass the yeild into the tuple, so I've set the chunksize differently. Is this the reason for the slow down?)
def listCompare(list_A, list_B):
toReturn = []
for item in list_A:
if item in list_B:
toReturn.append(item)
return toReturn
with multiprocessing.Pool() as pool:
for resultA in pool.starmap(listCompare, [(list_a1, list_b1)], chunksize=multiprocessing.cpu_count()):
output_list1.extend(resultA)
for resultB in pool.starmap(listCompare, [(list_a2, list_b2)], chunksize=multiprocessing.cpu_count()):
output_list2.extend(resultB)
for resultC in pool.starmap(listCompare, [(list_a3, list_b3)], chunksize=multiprocessing.cpu_count()):
output_list3.extend(resultC)
for resultD in pool.starmap(listCompare, [(list_a4, list_b4)], chunksize=multiprocessing.cpu_count()):
output_list4.extend(resultD)
Thanks in advance, and apologies if I've missed out anything that may help in answering!
As I said earlier, I know this can be done with intersection
, but I need the frequencies of each occurance, so I need to preserve duplicates.
CodePudding user response:
As pointed out in the comments, the speed is slow here because you are passing the whole iterable to a single process (as the argument length is one), rather than spread it over multiple processes.
I would recommend to keep using map here, using functools.partial
to fix argument containing list_B
, when passing the target function:
from functools import partial
def listCompare(list_B, list_A):
toReturn = []
for item in list_A:
if item in list_B:
toReturn.append(item)
return toReturn
with multiprocessing.Pool() as pool:
for resultA in pool.map(partial(listCompare, list_b1), list_a1, chunksize=len(list_a1//multiprocessing.cpu_count())):
output_list1.extend(resultA)
Keep in mind that the order of the keyword arguments has been swapped in listCompare
for this to work (list_B
comes before list_A
). The chunking is then done using chunksize
parameter (admittedly, a very crude version of it, you may want to fine tune it).
Alternatively, if you want to use starmap, then you may keep using your chunk
method like below:
def listCompare(list_A, list_B):
toReturn = []
for item in list_A:
if item in list_B:
toReturn.append(item)
return toReturn
with multiprocessing.Pool() as pool:
chunks = chunk(list_a, multiprocessing.cpu_count())
for resultA in pool.starmap(listCompare, [(c, list_b1) for c in chunks]):
output_list1.extend(resultA)
Since you already have chunked the iterable on your own, you no longer need to pass a chunksize
argument.
As a sidenote, if the iterable you pass to the pool is too long, you may want to use imap
instead to lazily iterate rather than storing the whole iterable in memory.