Home > database >  Python concurrent futures large number of inputs sit idle
Python concurrent futures large number of inputs sit idle

Time:01-27

I am processing large number of files (tens of millions) using Python's concurrent.futures. Issuing a small number of inputs work fine, however when the input size increases, the processes just don't start. Below code executes only when input size is small, e.g. 20_000.

import concurrent
import math

def some_math(x):
    y = 3*x**2   5*x   7
    return math.log(y)

inputs = range(1_000_000)
results = []

with concurrent.futures.ProcessPoolExecutor() as executor:
    for result in executor.map(some_math, inputs):
        results.append(result)

I have tried to overcome this by submitting jobs in smaller batches as below:

import concurrent
import math
    
def some_math(x):
    y = 3*x**2   5*x   7
    return math.log(y)

up_to = 220_000
batch_size = 20_000
results = []
for idx in range(0, up_to, batch_size):
    low = idx
    high = min(low   batch_size, up_to)
    
    inputs = range(low, high)
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for result in executor.map(some_math, inputs):
            results.append(result)

But again, it either does not start at all, or gets stuck after a few iterations of the outer for loop.

My Python version is 3.10.7. What is the issue here?

CodePudding user response:

you need to take advantage of the chunksize parameter of the map function, the pool is simply a pipe with a shared lock, and having 10 or more processes contending over that lock is going to be very slow.

using a large chunksize reduces this contention, as each process is going to grab a larger chunk off the queue each time it takes the lock.

import concurrent.futures
import math

def some_math(x):
    y = 3*x**2   5*x   7
    return math.log(y)
if __name__ == "__main__":
    inputs = range(1_000_000)
    results = []

    with concurrent.futures.ProcessPoolExecutor() as executor:
        for result in executor.map(some_math, inputs, chunksize=10_000):
            results.append(result)

in the above code the lock will only be locked 100 times instead of 1_000_000 times, and you get much less context switching.

  • Related