I'm quite familiar with pandas dataframes but I'm very new to Dask so I'm still trying to wrap my head around parallelizing my code. I've obtained my desired results using pandas and pandarallel already so what I'm trying to figure out is if I can scale up the task or speed it up somehow using Dask.
Let's say my dataframe has datetimes as non-unique indices, a values column and an id column.
time value id
2021-01-01 00:00:00.210281 28.08 293707
2021-01-01 00:00:00.279228 28.07 293708
2021-01-01 00:00:00.697341 28.08 293709
2021-01-01 00:00:00.941704 28.08 293710
2021-01-01 00:00:00.945422 28.07 293711
... ... ...
2021-01-01 23:59:59.288914 29.84 512665
2021-01-01 23:59:59.288914 29.83 512666
2021-01-01 23:59:59.288914 29.82 512667
2021-01-01 23:59:59.525227 29.84 512668
2021-01-01 23:59:59.784754 29.84 512669
What I want to extract is the latest value for every second. e.g. if the price right before 2021-01-01 00:00:01
is the row with the index of 2021-01-01 00:00:00.945422
the latest value is 28.07
.
In my case, sometimes index values are not unique so as a tie breaker, I'd like to use the id
column. The value with the largest id
number will be considered the latest value. For the case of the three values tied at the time 2021-01-01 23:59:59.288914
, the value 29.82
would be chosen since the largest id
for that date would be 512667
. Also note that id
is not consistent throughout the dataset and I cannot only rely on it for ordering my data.
In pandas I simply do this by obtaining the last index
last_index = df.loc[date_minus60: date_curr].index[-1]
last_values = df.loc[last_index]
and then if the value of last_values.index.is_unique
is false, I finally perform last_values.sort_values('id').iloc[-1]
.
I've been having a hard time translating this code to Dask encountering issues regarding my delayed function resulting to them to need computing before I can reindex my dataframe again.
I'd like to know if there are any best practices to dealing with this sort of problem.
CodePudding user response:
The snippet below shows that it's a very similar syntax:
import dask
# generate dask dataframe
ddf = dask.datasets.timeseries(freq="500ms", partition_freq="1h")
# generate a pandas dataframe
df = ddf.partitions[0].compute() # pandas df for example
# sample dates
date_minus60 = "2000-01-01 00:00:00.000"
date_curr = "2000-01-01 00:00:02.000"
# pandas code
last_index_pandas = df.loc[date_minus60:date_curr].index[-1]
last_values_pandas = df.loc[last_index_pandas]
# dask code
last_index_dask = ddf.loc[date_minus60:date_curr].compute().index[-1]
last_values_dask = ddf.loc[last_index_dask].compute()
# check equality of the results
print(last_values_pandas == last_values_dask)
Note that the distinction is in two .compute
steps in dask
version, since two lazy values need to be computed: first is to find out the correct index location and second is to get the actual value. Also this assumes that the data is already indexed by the timestamp, if it is not, it's best to index the data before loading into dask
since .set_index
is in general a slow operation.
However, depending on what you are really after this is probably not a great use of dask
. If the underlying idea is to do fast lookups, then a better solution is to use indexed databases (including specialised time-series databases).
Finally, the snippet above is using unique index. If the actual data has non-unique indexes, then the requirement on selection by largest id
is something that should be handled once the last_values_dask
is computed, by using something like this (pseudo code, not expected to work right away):
def get_largest_id(last_values):
return last_values.sort_values('id').tail(1)
last_values_dask = get_largest_id(last_values_dask)
There is scope for designing a better pipeline if the lookup is for batches (rather than specific sample dates).