Home > OS >  How to speed up groupby().sum() on a dask dataframe with 5 millions of rows and 500 thousands of gro
How to speed up groupby().sum() on a dask dataframe with 5 millions of rows and 500 thousands of gro

Time:12-12

I have a dataframe with

  • 5 millions of rows.
  • a column group_id whose number of unique elements is 500.000.
  • thousands of other columns named var1, var2, etc. Each of var1, var2, ... contains only 0 and 1.

I would like to group by group_id and then sum them up. To have better performance, I use dask. However, the speed is still slow for this simple aggregation.

The time spent on a dataframe with 10 columns is 6.285385847091675 seconds
The time spent on a dataframe with 100 columns is 64.9060411453247 seconds
The time spent on a dataframe with 200 columns is 150.6109869480133 seconds
The time spent on a dataframe with 300 columns is 235.77087807655334 seconds

My real dataset contains up to 30.000 columns. I have read answers (1 and 2) by @Divakar about using numpy. However, the former thread is about counting and the latter is about summing columns.

Could you please elaborate on some ways to speed up this aggregation?

import numpy as np
import pandas as pd
import os, time
from multiprocessing import dummy
import dask.dataframe as dd

core = os.cpu_count()
P = dummy.Pool(processes = core)

n_docs = 500000
n_rows = n_docs * 10
data = {}

def create_col(i):
    name = 'var'   str(i)
    data[name] = np.random.randint(0, 2, n_rows)

n_cols = 300
P.map(create_col, range(1, n_cols   1))
df = pd.DataFrame(data, dtype = 'int8')
df.insert(0, 'group_id', np.random.randint(1, n_docs   1, n_rows))
df = dd.from_pandas(df, npartitions = 3 * core) 

start = time.time()
df.groupby('group_id').sum().compute()
end = time.time()
print('The time spent on a dataframe with {} columns is'.format(n_cols), end - start, 'seconds')

CodePudding user response:

(I misunderstood OP in original answer, so clearing all).

I got improvement by:

  • switching to numpy
  • using same dtype for group and data (np.int32)
  • using numba with parallel mode'
import numba as nb
@nb.njit('int32[:, :](int32[:, :], int_)', parallel=True)
def count_groups2(group_and_data, n_groups):
    n_cols = group_and_data.shape[1] - 1
    counts = np.zeros((n_groups, n_cols), dtype=np.int32)
    for idx in nb.prange(len(group_and_data)):
        row = group_and_data[idx]
        counts[row[0]]  = row[1:]
    return counts

df = pd.DataFrame(data, dtype='int32')
group_id = np.random.randint(1, n_docs   1, n_rows, dtype=np.int32)
df.insert(0, 'group_id', group_id)

# switching to numpy (line below) is costly
# it would be faster to work with numpy alone (no pandas)
group_and_data = df.values
faster4(group_and_data)
    op_method(df)


    77         1    1519304.0 1519304.0      8.6      group_and_data = df.values
    78         1    1030130.0 1030130.0      5.8      faster4(group_and_data)
    79         1   12068685.0 12068685.0     68.4      op_method(df)
  • Related