Good evening!
I have a code similar to the one I will paste below, it has a lot more data but the premise is the same. From both DataFrames I have to pull the first five values but when I am dealing with tens of millions of entries I cannot afford waiting sometimes up to an hour for it to compute the whole DataFrame and return me the first five values. I also cannot use simple Pandas DataFrames as they exceed my memory limit. Is there a solution to this?
import random
import pandas
import dask.dataframe as dd
import time
# Random list from 1 to 10,000,000.
random_pool = [random.randint(1, 1000000) for i in range(10000000)]
random.shuffle(random_pool)
df1 = dd.from_pandas(pandas.DataFrame(random_pool[:100000], columns=["ID"]), npartitions=10)
df2 = dd.from_pandas(pandas.DataFrame(random_pool, columns=["ID"]), npartitions=10)
# Sorting both dataframes.
df1 = df1.sort_values("ID", ascending=True)
df2 = df2.sort_values("ID", ascending=True)
df1_start = time.time()
df1.head(5)
print("DF1 took {:.2f}.".format(time.time() - df1_start))
df2_start = time.time()
df2.head(5)
print("DF2 took {:.2f}.".format(time.time() - df2_start))
The first DataFrame takes around 0.41 seconds meanwhile the second one takes around 1.79.
CodePudding user response:
One thing to keep in mind is that a value in dask is really a stack of operations, serialized. Lots of computation is deferred until when you actually ask for the values to be materialized - like using head, or in general, using .compute()
.
In the spirit of the general advice on persist
, you can try to use .persist() after the sort calls:
It is often ideal to load, filter, and shuffle data once and keep this result in memory. Afterwards, each of the several complex queries can be based off of this in-memory data rather than have to repeat the full load-filter-shuffle process each time. To do this, use the client.persist method [ The .persist() method in this case ].
And take the time to think about what happens if we don't persist here - the future that needs to be resolved when you call head
will include the sort_values
call, and you're probably seeing the cost of sorting all your data every time you call head
- and that explains why getting just the five first items has a cost proportional to the size of the whole dataset - because the whole dataset is being sorted.
The answer is that dask is quite fast about getting the first five items. But it might not be so fast to resolve all the computations to get there, if they are not already in memory.
You should in general avoid whole-dataset shuffling like in this example - the sort!