I'm trying to test concurrent.futures
to speed calculation. Here is the test code:
import concurrent.futures
import numpy as np
import random
def task(n):
c = np.sum(n)
# print sum result
print(c)
return c
# generate a 3D array
b = np.random.rand(2, 3, 100)
executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
# along axis=2 to do sum in task function
v = executor.map(np.apply_along_axis(task, 2, b))
list(v)
I can see executor.map()
does calculation, as print(c)
in task
function can print sum
result.
The questions are:
- How to get the data from
v
, which is a generator object? - When I use
list(v)
, it gives[]
as a result. Why?
CodePudding user response:
The main issue is a misunderstanding. np.apply_along_axis
is going to call the function task
repeatedly (but serially), and then return the modified array. You then pass that returned array to executor.map
, when it's actually expecting to find a function.
So, it's not executor.map
that is calling your function, it is apply_along_axis
.
As I mentioned above, if your task is CPU-bound, Python threading is not going to help with the time. Perhaps you should ask a different question that includes more details of your situation, instead of inventing a fake one like this.
CodePudding user response:
First understand what apply
does, or does not
Modify the function to be a bit more diagnostic:
def task(n):
print(n)
c = np.sum(n)
# print sum result
print(c)
return c
And with smaller array:
In [205]: b = np.arange(24).reshape(2, 3, 4)
In [207]: np.apply_along_axis(task, 2, b)
[0 1 2 3]
6
[4 5 6 7]
22
[ 8 9 10 11]
38
[12 13 14 15]
54
[16 17 18 19]
70
[20 21 22 23]
86
Out[207]:
array([[ 6, 22, 38],
[54, 70, 86]])
apply
passes successive "rows" of b
to task
. The result is a (2,3) array - the b
array reduced along its last axis.
If task
really is just this simple, you can get the same result much faster with an axis parameter.
In [208]: np.sum(b, axis=2)
Out[208]:
array([[ 6, 22, 38],
[54, 70, 86]])
Before spending a lot effort into getting threaded/multiprocessing working, try to take full use of numpy
compiled code.
apply
is essentially:
In [209]: np.array([[task(row) for row in plane] for plane in b])
[0 1 2 3]
6
[4 5 6 7]
22
...
Out[209]:
array([[ 6, 22, 38],
[54, 70, 86]])
Modifying task
to omit the prints, we can compare the times:
In [211]: timeit np.apply_along_axis(task1, 2, b)
154 µs ± 6.86 µs per loop (mean ± std. dev. of 7 runs, 10,000 loops each)
In [212]: timeit np.array([[task1(row) for row in plane] for plane in b])
72 µs ± 53.6 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each)
In [213]: timeit np.sum(b, axis=2)
10.9 µs ± 18.5 ns per loop (mean ± std. dev. of 7 runs, 100,000 loops each)
===
From the executor.map
docs
map(func, *iterables, timeout=None, chunksize=1)
the first arg is func
, a function, not the result of calling a function. The second one or more iterables.
It may be possible to use
map(task, b.reshape(-1,100))
and get back a list of "row" sums.
In [217]: v = executor.map(task, b.reshape(-1, 4))
[0 1 2 3]
6
[4 5 6 7]
22
[ 8 9 10 11]
38
...
In [218]: v
Out[218]: <generator object Executor.map.<locals>.result_iterator at 0x7f235c85be40>
In [219]: list(v)
Out[219]: [6, 22, 38, 54, 70, 86]
For this small example, and my modest machine, the multiprocessing doesn't help at all:
In [220]: timeit list(executor.map(task1, b.reshape(-1, 4)))
308 µs ± 35.9 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)