Home > Blockchain >  Convert huge Polars dataframe to dict without consuming too much RAM
Convert huge Polars dataframe to dict without consuming too much RAM

Time:01-20

When I load my parquet file into a Polars DataFrame, it takes about 5.5 GB of RAM. Polars is great compared to other options I have tried. However, Polars does not support creating indices like Pandas. This is troublesome for me because one column in my DataFrame is unique and the pattern of accessing the data in the df in my application is row lookups based on the unique column (dict-like).

Since the dataframe is massive, filtering is too expensive. However, I also seem to be short on RAM (32 GB). I am currently converting the df in "chunks" like this:

h = df.height # number of rows
chunk_size = 1_000_000  # split each rows

b = (np.linspace(1, math.ceil(h/chunk_size), num=math.ceil(h/chunk_size)))
new_col = (np.repeat(b, chunk_size))[:-( chunk_size - (h%chunk_size))]
df = df.with_column(polars.lit(new_col).alias('new_index'))
m = df.partition_by(groups="new_index", as_dict=True)

del df
gc.collect()


my_dict = {}
for key, value in list(m.items()):
    my_dict.update(
    {
        uas: frame.select(polars.exclude("unique_col")).to_dicts()[0]
        for uas, frame in
        (
            value
            .drop("new_index")
            .unique(subset=["unique_col"], keep='last')
            .partition_by(groups=["unique_col"],
                          as_dict=True,
                          maintain_order=True)
        ).items()
    }
    )
    m.pop(key)

RAM consumption does not seem to have changed much. Plus, I get an error saying that the dict size has changed during iteration (true). But what can I do? Is getting more RAM the only option?

CodePudding user response:

When I load my parquet file into a Polars DataFrame, it takes about 5.5 GB of RAM. Polars is great compared to other options I have tried. However, Polars does not support creating indices like Pandas. This is troublesome for me because one column in my DataFrame is unique and the pattern of accessing the data in the df in my application is row lookups based on the unique column (dict-like).

Since the DataFrame is massive, filtering is too expensive.

Let's see if I can help. There may be ways to get excellent performance without partitioning your DataFrame.

First, some data

First, let's create a VERY large dataset and do some timings. The code below is something I've used in other situations to create a dataset of an arbitrary size. In this case, I'm going to create a dataset that is 400 GB in RAM. (I have a 32-core system with 512 GB of RAM.)

After creating the dataframe, I'm going to use set_sorted on col_0. (By the way the data is created, all columns are in sorted order.) In addition, I'll shuffle col_1 to look at some timings with sorted and non-sorted lookup columns.

import polars as pl
import math
import time

mem_in_GB = 400
def mem_squash(mem_size_GB: int) -> pl.DataFrame:
    nbr_uint64 = mem_size_GB * (2**30) / 8
    nbr_cols = math.ceil(nbr_uint64 ** (0.15))
    nbr_rows = math.ceil(nbr_uint64 / nbr_cols)

    return pl.DataFrame(
        data={
            "col_"   str(col_nbr): pl.arange(0, nbr_rows, eager=True)
            for col_nbr in range(nbr_cols)
        }
    )


df = mem_squash(mem_in_GB)
df = df.with_columns([
    pl.col('col_0').set_sorted(),
    pl.col('col_1').shuffle(),
])
df.estimated_size('gb')
df
>>> df.estimated_size('gb')
400.0000000670552
>>> df
shape: (1309441249, 41)
┌────────────┬───────────┬────────────┬────────────┬─────┬────────────┬────────────┬────────────┬────────────┐
│ col_0      ┆ col_1     ┆ col_2      ┆ col_3      ┆ ... ┆ col_37     ┆ col_38     ┆ col_39     ┆ col_40     │
│ ---        ┆ ---       ┆ ---        ┆ ---        ┆     ┆ ---        ┆ ---        ┆ ---        ┆ ---        │
│ i64        ┆ i64       ┆ i64        ┆ i64        ┆     ┆ i64        ┆ i64        ┆ i64        ┆ i64        │
╞════════════╪═══════════╪════════════╪════════════╪═════╪════════════╪════════════╪════════════╪════════════╡
│ 0          ┆ 438030034 ┆ 0          ┆ 0          ┆ ... ┆ 0          ┆ 0          ┆ 0          ┆ 0          │
│ 1          ┆ 694387471 ┆ 1          ┆ 1          ┆ ... ┆ 1          ┆ 1          ┆ 1          ┆ 1          │
│ 2          ┆ 669976383 ┆ 2          ┆ 2          ┆ ... ┆ 2          ┆ 2          ┆ 2          ┆ 2          │
│ 3          ┆ 771482771 ┆ 3          ┆ 3          ┆ ... ┆ 3          ┆ 3          ┆ 3          ┆ 3          │
│ ...        ┆ ...       ┆ ...        ┆ ...        ┆ ... ┆ ...        ┆ ...        ┆ ...        ┆ ...        │
│ 1309441245 ┆ 214104601 ┆ 1309441245 ┆ 1309441245 ┆ ... ┆ 1309441245 ┆ 1309441245 ┆ 1309441245 ┆ 1309441245 │
│ 1309441246 ┆ 894526083 ┆ 1309441246 ┆ 1309441246 ┆ ... ┆ 1309441246 ┆ 1309441246 ┆ 1309441246 ┆ 1309441246 │
│ 1309441247 ┆ 378223586 ┆ 1309441247 ┆ 1309441247 ┆ ... ┆ 1309441247 ┆ 1309441247 ┆ 1309441247 ┆ 1309441247 │
│ 1309441248 ┆ 520540081 ┆ 1309441248 ┆ 1309441248 ┆ ... ┆ 1309441248 ┆ 1309441248 ┆ 1309441248 ┆ 1309441248 │
└────────────┴───────────┴────────────┴────────────┴─────┴────────────┴────────────┴────────────┴────────────┘

So I now have a DataFrame of 1.3 billion rows that takes up 400 GB of my 512 GB of RAM. (Clearly, I cannot afford to make copies of this DataFrame in memory.)

Simple Filtering

Now, let's run a simple filter, on both the col_0 (the column on which I used set_sorted) and col_1 (the shuffled column).

start = time.perf_counter()
(
    df
    .filter(pl.col('col_0') == 106338253)
)
print(time.perf_counter() - start)
shape: (1, 41)
┌───────────┬───────────┬───────────┬───────────┬─────┬───────────┬───────────┬───────────┬───────────┐
│ col_0     ┆ col_1     ┆ col_2     ┆ col_3     ┆ ... ┆ col_37    ┆ col_38    ┆ col_39    ┆ col_40    │
│ ---       ┆ ---       ┆ ---       ┆ ---       ┆     ┆ ---       ┆ ---       ┆ ---       ┆ ---       │
│ i64       ┆ i64       ┆ i64       ┆ i64       ┆     ┆ i64       ┆ i64       ┆ i64       ┆ i64       │
╞═══════════╪═══════════╪═══════════╪═══════════╪═════╪═══════════╪═══════════╪═══════════╪═══════════╡
│ 106338253 ┆ 885386691 ┆ 106338253 ┆ 106338253 ┆ ... ┆ 106338253 ┆ 106338253 ┆ 106338253 ┆ 106338253 │
└───────────┴───────────┴───────────┴───────────┴─────┴───────────┴───────────┴───────────┴───────────┘
>>> print(time.perf_counter() - start)
0.6669054719995984
start = time.perf_counter()
(
    df
    .filter(pl.col('col_1') == 106338253)
)
print(time.perf_counter() - start)
shape: (1, 41)
┌───────────┬───────────┬───────────┬───────────┬─────┬───────────┬───────────┬───────────┬───────────┐
│ col_0     ┆ col_1     ┆ col_2     ┆ col_3     ┆ ... ┆ col_37    ┆ col_38    ┆ col_39    ┆ col_40    │
│ ---       ┆ ---       ┆ ---       ┆ ---       ┆     ┆ ---       ┆ ---       ┆ ---       ┆ ---       │
│ i64       ┆ i64       ┆ i64       ┆ i64       ┆     ┆ i64       ┆ i64       ┆ i64       ┆ i64       │
╞═══════════╪═══════════╪═══════════╪═══════════╪═════╪═══════════╪═══════════╪═══════════╪═══════════╡
│ 889423291 ┆ 106338253 ┆ 889423291 ┆ 889423291 ┆ ... ┆ 889423291 ┆ 889423291 ┆ 889423291 ┆ 889423291 │
└───────────┴───────────┴───────────┴───────────┴─────┴───────────┴───────────┴───────────┴───────────┘
>>> print(time.perf_counter() - start)
0.6410857040000337

In both cases, filtering to find a unique column value took less than one second. (And that's for a DataFrame of 1.3 billion records.)

With such performance, you may be able to get by with simply filtering in your application, without the need to partition your data.

Wicked Speed: search_sorted

If you have sufficient RAM to sort your DataFrame by the search column, you may be able to get incredible speed.

Let's use search_sorted on col_0 (which is sorted), and slice which is merely a window into a DataFrame.

start = time.perf_counter()
(
    df.slice(df.get_column('col_0').search_sorted(106338253), 1)
)
print(time.perf_counter() - start)
shape: (1, 41)
┌───────────┬───────────┬───────────┬───────────┬─────┬───────────┬───────────┬───────────┬───────────┐
│ col_0     ┆ col_1     ┆ col_2     ┆ col_3     ┆ ... ┆ col_37    ┆ col_38    ┆ col_39    ┆ col_40    │
│ ---       ┆ ---       ┆ ---       ┆ ---       ┆     ┆ ---       ┆ ---       ┆ ---       ┆ ---       │
│ i64       ┆ i64       ┆ i64       ┆ i64       ┆     ┆ i64       ┆ i64       ┆ i64       ┆ i64       │
╞═══════════╪═══════════╪═══════════╪═══════════╪═════╪═══════════╪═══════════╪═══════════╪═══════════╡
│ 106338253 ┆ 885386691 ┆ 106338253 ┆ 106338253 ┆ ... ┆ 106338253 ┆ 106338253 ┆ 106338253 ┆ 106338253 │
└───────────┴───────────┴───────────┴───────────┴─────┴───────────┴───────────┴───────────┴───────────┘
>>> print(time.perf_counter() - start)
0.00603273300021101

If you can sort your dataframe by the lookup column and use search_sorted, you can get some incredible performance: a speed-up by a factor of ~100x in our example above.

Does this help? Perhaps you can get the performance you need without partitioning your data.

CodePudding user response:

.partition_by() returns a list of dataframes which does not sound helpful in this case.

You can use .groupby_dynamic() to process a dataframe in "chunks" - .with_row_count() can be used as an "index".

import polars as pl

chunk_size = 3

df = pl.DataFrame({"a": range(1, 11), "b": range(11, 21)})

(df.with_row_count()
   .with_columns(pl.col("row_nr").cast(pl.Int64))
   .groupby_dynamic(index_column="row_nr", every=f"{chunk_size}i")
   .agg([
      pl.col("a"),
      pl.col("b"),
   ]))
shape: (4, 3)
┌────────┬───────────┬──────────────┐
│ row_nr ┆ a         ┆ b            │
│ ---    ┆ ---       ┆ ---          │
│ i64    ┆ list[i64] ┆ list[i64]    │
╞════════╪═══════════╪══════════════╡
│ 0      ┆ [1, 2, 3] ┆ [11, 12, 13] │
│ 3      ┆ [4, 5, 6] ┆ [14, 15, 16] │
│ 6      ┆ [7, 8, 9] ┆ [17, 18, 19] │
│ 9      ┆ [10]      ┆ [20]         │
└────────┴───────────┴──────────────┘

I'm assuming you're using .scan_parquet() to load your data and you have a LazyFrame?

It may be helpful if you can share an actual example of what you need to do to your data.

  • Related