I am processing large number of files (tens of millions) using Python's concurrent.futures. Issuing a small number of inputs work fine, however when the input size increases, the processes just don't start. Below code executes only when input size is small, e.g. 20_000.
import concurrent
import math
def some_math(x):
y = 3*x**2 5*x 7
return math.log(y)
inputs = range(1_000_000)
results = []
with concurrent.futures.ProcessPoolExecutor() as executor:
for result in executor.map(some_math, inputs):
results.append(result)
I have tried to overcome this by submitting jobs in smaller batches as below:
import concurrent
import math
def some_math(x):
y = 3*x**2 5*x 7
return math.log(y)
up_to = 220_000
batch_size = 20_000
results = []
for idx in range(0, up_to, batch_size):
low = idx
high = min(low batch_size, up_to)
inputs = range(low, high)
with concurrent.futures.ProcessPoolExecutor() as executor:
for result in executor.map(some_math, inputs):
results.append(result)
But again, it either does not start at all, or gets stuck after a few iterations of the outer for loop.
My Python version is 3.10.7. What is the issue here?
CodePudding user response:
you need to take advantage of the chunksize
parameter of the map
function, the pool is simply a pipe with a shared lock, and having 10 or more processes contending over that lock is going to be very slow.
using a large chunksize reduces this contention, as each process is going to grab a larger chunk off the queue each time it takes the lock.
import concurrent.futures
import math
def some_math(x):
y = 3*x**2 5*x 7
return math.log(y)
if __name__ == "__main__":
inputs = range(1_000_000)
results = []
with concurrent.futures.ProcessPoolExecutor() as executor:
for result in executor.map(some_math, inputs, chunksize=10_000):
results.append(result)
in the above code the lock will only be locked 100 times instead of 1_000_000 times, and you get much less context switching.