I am a new dask user and I'm trying to run the function dot inside my program. I noticed that the function dot of dask is slower than its numpy version even when I use only one chunk in the whole matrix. How this behavious can be explained?
import dask.array as da
import numpy as np
x = da.random.normal(10, 0.1, size=(20000 * 100000), chunks=(20000 * 100000))
z = x.dot(x)
%time z.compute()
'''
CPU times: user 1min 1s, sys: 17.3 s, total: 1min 18s
Wall time: 52 s
'''
y = x.compute()
%time w =y.dot(y)
'''
CPU times: user 19 s, sys: 8.24 s, total: 27.2 s
Wall time: 767 ms
'''
CodePudding user response:
If you’re only using one chunk, then dask cannot possibly be faster than numpy. Dask is doing the following (very simplistically):
- Starting a scheduler. This is a separate process with a database which tracks and manages work across your cluster
- Starting a worker. This is where the computation will take place. The worker connects to the scheduler, which directs communication between nodes on the cluster
- Schedule the job. Your main thread communicates with the scheduler to break the task into pieces which can be understood and managed by dask, and does any computation needed to understand array dims, types, and sizes, as well as dependencies within a multi-stage graph.
- Serialize, transfer, and deserialize the input data. You’ve done well to use dask.random to create the array, so there’s no input data here. But anything you pass to dask needs to be converted into a byte string and passed between processes through ports.
- Execute the task on a worker. This step does the exact same work as your numpy equivalent. Since it’s only one chunk, it does not happen in parallel. It’s just happening on a different process.
- Serialize, transfer, and deserialize the result. The answer needs to be sent back through the port to the main thread.
This is a simplistic description, but you get the idea. You can think of dask runtime as (parallelized numpy runtime) / n_workers non-parallelized runtime overhead, and the overhead is not trivial. This is totally worth the overhead in many cases - I use dask every day for operations too large to tackle with one machine in memory. But it doesn’t just magically make stuff run faster - you need to think explicitly about how you’re splitting up the work to take advantage of multiple workers.
See the dask docs on Best Practices for more info.
CodePudding user response:
Answering again with a totally different approach... your performance test is really unfair to dask.
A huge share of the time in your test is taken up by allocating a 16GB array and filling it with normally-distributed random values. The first line of your test, where you define the array x
, you only schedule this operation. Then, dask has to execute this allocation & population operation during the %time
d test. In the numpy test, you first compute y
, giving numpy a pre-defined array, and you're only timing the dot product.
Dask is evaluated lazily, meaning it waits for you to need a result before it computes it. This is really powerful, as it means you can for example open a really large file, do a bunch of math on it, and then subset the result, and dask will only read in and do math on the required subset. On the other hand, it does mean you have to be really careful when interpreting errors and timing, as the computation only occurs when you trigger it with a blocking call such as compute
(other examples are write or plot operations).
Setting up a fair comparison, the two are much more similar:
In [1]: import dask.array as da, numpy as np
In [2]: %%time
...: x = da.random.normal(10, 0.1, size=(20000 * 100000), chunks=(20000 * 100000))
...: z = x.dot(x)
...: z.compute()
...:
...:
CPU times: user 48.4 s, sys: 15.9 s, total: 1min 4s
Wall time: 52.8 s
Out[2]: 200020152771.42023
In [3]: %%time
...: x = np.random.normal(10, 0.1, size=(20000 * 100000))
...: z = x.dot(x)
...:
...:
CPU times: user 48.3 s, sys: 14.8 s, total: 1min 3s
Wall time: 53 s
On the other hand, splitting the dask array job into 100 chunks cuts down on the total time dramatically:
In [4]: %%time
...: x = da.random.normal(10, 0.1, size=(20000 * 100000), chunks=(200 * 100000))
...: z = x.dot(x)
...: z.compute()
...:
...:
CPU times: user 54.4 s, sys: 1.61 s, total: 56 s
Wall time: 6.05 s
Out[4]: 200020035893.7987