Home > database >  Is there a way to traverse through a dask dataframe backwards?
Is there a way to traverse through a dask dataframe backwards?

Time:06-24

I want to read_parquet but read backwards from where you start (assuming a sorted index). I don't want to read the entire parquet into memory because that defeats the whole point of using it. Is there a nice way to do this?

CodePudding user response:

If the last N rows are all in the last partition, you can use dask.dataframe.tail. If not, you can iterate backwards using the dask.dataframe.partitions attribute. This isn't particularly smart and will blow up your memory if you request too many rows, but it should do the trick:

def get_last_n(n, df):
    read = []
    lines_read = 0
    for i in range(df.npartitions - 1, -1, -1):
        p = df.partitions[i].tail(n - lines_read)

        read.insert(0, p)
        lines_read  = len(p)
        if lines_read >= n:
            break

    return pd.concat(read, axis=0)

For example, here's a dataframe with 20 rows and 5 partitions:

import dask.dataframe, pandas as pd, numpy as np, dask

df = dask.dataframe.from_pandas(pd.DataFrame({'A': np.arange(20)}), npartitions=5)

You can call the above function with any number of rows to get that many rows in the tail:

In [4]: get_last_n(4, df)
Out[4]:
     A
16  16
17  17
18  18
19  19

In [5]: get_last_n(10, df)
Out[5]:
     A
10  10
11  11
12  12
13  13
14  14
15  15
16  16
17  17
18  18
19  19

Requesting more rows than are in the dataframe just computes the whole dataframe:

In [6]: get_last_n(1000, df)
Out[6]:
     A
0    0
1    1
2    2
3    3
4    4
5    5
6    6
7    7
8    8
9    9
10  10
11  11
12  12
13  13
14  14
15  15
16  16
17  17
18  18
19  19

Note that this requests the data iteratively, so may be very inefficient if your graph is complex and involves lots of shuffles.

CodePudding user response:

Assuming that the dataframe is indexed, the inversion of the index can be done as a two step process: invert the order of partitions and invert the index within each partition:

from dask.datasets import timeseries

ddf = timeseries()

ddf_inverted = (
    ddf
    .partitions[::-1]
    .map_partitions(lambda df: df.sort_index(ascending=False))
)
  • Related