How to extract result from a generator object that created by executor.map()


I'm trying to test concurrent.futures to speed calculation. Here is the test code:

import concurrent.futures
import numpy as np
import random

def task(n):
    c = np.sum(n)
    # print sum result
    return c

# generate a 3D array
b = np.random.rand(2, 3, 100)

executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)

# along axis=2 to do sum in task function
v = executor.map(np.apply_along_axis(task, 2, b))


I can see executor.map() does calculation, as print(c) in task function can print sum result.

The questions are:

  1. How to get the data from v, which is a generator object?
  2. When I use list(v), it gives [] as a result. Why?

CodePudding user response:

The main issue is a misunderstanding. np.apply_along_axis is going to call the function task repeatedly (but serially), and then return the modified array. You then pass that returned array to executor.map, when it's actually expecting to find a function.

So, it's not executor.map that is calling your function, it is apply_along_axis.

As I mentioned above, if your task is CPU-bound, Python threading is not going to help with the time. Perhaps you should ask a different question that includes more details of your situation, instead of inventing a fake one like this.

CodePudding user response:

First understand what apply does, or does not

Modify the function to be a bit more diagnostic:

def task(n):
    c = np.sum(n)
    # print sum result
    return c

And with smaller array:

In [205]: b = np.arange(24).reshape(2, 3, 4)
In [207]: np.apply_along_axis(task, 2, b)
[0 1 2 3]
[4 5 6 7]
[ 8  9 10 11]
[12 13 14 15]
[16 17 18 19]
[20 21 22 23]
array([[ 6, 22, 38],
       [54, 70, 86]])

apply passes successive "rows" of b to task. The result is a (2,3) array - the b array reduced along its last axis.

If task really is just this simple, you can get the same result much faster with an axis parameter.

In [208]: np.sum(b, axis=2)
array([[ 6, 22, 38],
       [54, 70, 86]])

Before spending a lot effort into getting threaded/multiprocessing working, try to take full use of numpy compiled code.

apply is essentially:

In [209]: np.array([[task(row) for row in plane] for plane in b])
[0 1 2 3]
[4 5 6 7]
array([[ 6, 22, 38],
       [54, 70, 86]])

Modifying task to omit the prints, we can compare the times:

In [211]: timeit np.apply_along_axis(task1, 2, b)
154 µs ± 6.86 µs per loop (mean ± std. dev. of 7 runs, 10,000 loops each)
In [212]: timeit np.array([[task1(row) for row in plane] for plane in b])
72 µs ± 53.6 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each)
In [213]: timeit np.sum(b, axis=2)
10.9 µs ± 18.5 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops each)


From the executor.map docs

map(func, *iterables, timeout=None, chunksize=1)

the first arg is func, a function, not the result of calling a function. The second one or more iterables.

It may be possible to use

map(task, b.reshape(-1,100))

and get back a list of "row" sums.

In [217]: v = executor.map(task, b.reshape(-1, 4))
[0 1 2 3]
[4 5 6 7]
[ 8  9 10 11]
In [218]: v
Out[218]: <generator object Executor.map.<locals>.result_iterator at 0x7f235c85be40>
In [219]: list(v)
Out[219]: [6, 22, 38, 54, 70, 86]

For this small example, and my modest machine, the multiprocessing doesn't help at all:

In [220]: timeit list(executor.map(task1, b.reshape(-1, 4)))
308 µs ± 35.9 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
