I want to read_parquet
but read backwards from where you start (assuming a sorted index). I don't want to read the entire parquet into memory because that defeats the whole point of using it. Is there a nice way to do this?
CodePudding user response:
If the last N rows are all in the last partition, you can use dask.dataframe.tail
. If not, you can iterate backwards using the dask.dataframe.partitions
attribute. This isn't particularly smart and will blow up your memory if you request too many rows, but it should do the trick:
def get_last_n(n, df):
read = []
lines_read = 0
for i in range(df.npartitions - 1, -1, -1):
p = df.partitions[i].tail(n - lines_read)
read.insert(0, p)
lines_read = len(p)
if lines_read >= n:
break
return pd.concat(read, axis=0)
For example, here's a dataframe with 20 rows and 5 partitions:
import dask.dataframe, pandas as pd, numpy as np, dask
df = dask.dataframe.from_pandas(pd.DataFrame({'A': np.arange(20)}), npartitions=5)
You can call the above function with any number of rows to get that many rows in the tail:
In [4]: get_last_n(4, df)
Out[4]:
A
16 16
17 17
18 18
19 19
In [5]: get_last_n(10, df)
Out[5]:
A
10 10
11 11
12 12
13 13
14 14
15 15
16 16
17 17
18 18
19 19
Requesting more rows than are in the dataframe just computes the whole dataframe:
In [6]: get_last_n(1000, df)
Out[6]:
A
0 0
1 1
2 2
3 3
4 4
5 5
6 6
7 7
8 8
9 9
10 10
11 11
12 12
13 13
14 14
15 15
16 16
17 17
18 18
19 19
Note that this requests the data iteratively, so may be very inefficient if your graph is complex and involves lots of shuffles.
CodePudding user response:
Assuming that the dataframe is indexed, the inversion of the index can be done as a two step process: invert the order of partitions and invert the index within each partition:
from dask.datasets import timeseries
ddf = timeseries()
ddf_inverted = (
ddf
.partitions[::-1]
.map_partitions(lambda df: df.sort_index(ascending=False))
)