I wrote the following custom aggregation that merges simultaneous measurements into as few as possible datapoints without deleting any values
def aggregate_nondestructive(df: Series | DataFrame) -> Series | DataFrame:
"""Aggregate multiple simulataneous measurements in a non-destructive way.
Given a DataFrame of size m×k,
this will construct a new DataFrame of size m'×k, where m' = max(df.notna().sum()) is the
maximal number of measured not-null valuesin a column.
Example
-------
Acetate Base DOT Fluo_GFP Glucose OD600 Probe_Volume pH
measurement_time
2020-12-09 09:48:38 <NA> <NA> <NA> <NA> 4.578233 <NA> <NA> <NA>
2020-12-09 09:48:38 <NA> <NA> <NA> <NA> <NA> 0.445 <NA> <NA>
2020-12-09 09:48:38 0.116585 <NA> <NA> <NA> <NA> <NA> <NA> <NA>
2020-12-09 09:48:38 0.114842 <NA> <NA> <NA> <NA> <NA> <NA> <NA>
2020-12-09 09:48:38 <NA> <NA> <NA> <NA> <NA> 0.485 <NA> <NA>
2020-12-09 09:48:38 <NA> <NA> <NA> <NA> <NA> <NA> 200 <NA>
2020-12-09 09:48:38 <NA> <NA> <NA> 1112.5 <NA> <NA> <NA> <NA>
2020-12-09 09:48:38 <NA> <NA> <NA> 912.5 <NA> <NA> <NA> <NA>
2020-12-09 09:48:38 <NA> <NA> <NA> <NA> 4.554859 <NA> <NA> <NA>
Returns
Acetate Base DOT Fluo_GFP Glucose OD600 Probe_Volume pH
measurement_time
2020-12-09 09:48:38 0.116585 <NA> <NA> 1112.5 4.578233 0.445 200 <NA>
2020-12-09 09:48:38 0.114842 <NA> <NA> 912.5 4.554859 0.485 <NA> <NA>
"""
if isinstance(df, Series):
return df
mask = df.notna()
nitems = mask.sum()
nrows = nitems.max()
result = DataFrame(index=df.index[:nrows], columns=df.columns).astype(df.dtypes)
for col in result:
result[col].iloc[:nitems[col]] = df.loc[mask[col], col]
return result
I want to use this to aggregate simultaneous measurements in a table with ~1M rows (using groupby(["measurement_time"]).apply(aggregate_nondestructive)
), However, it is very, very slow. Any simple ideas how to speed this up aside from rewriting it in numba?
CodePudding user response:
you can stack
the data to remove the nan
. Use groupby.cumcount
to get the number of non null value per column for each measurement_time. Append this to the index of the stack
data and then unstack
on the level that are the original column names. Then reindex
the columns to be sure to get all original columns (this might not be necessary if you have at least one value in each column across all the measurement_time) and reset_index
that was appended from the cumcount
.
_tmp_df = df.stack().to_frame(name='val')
res = (
_tmp_df
.set_index(_tmp_df.groupby(level=[0,1]).cumcount(), append=True)
['val'].unstack(level=1)
.reindex(df.columns, axis=1)
.reset_index(level=1, drop=True)
)
print(res)
# Acetate Base DOT Fluo_GFP Glucose OD600 \
# measurement_time
# 2020-12-09 09:48:38 0.116585 NaN NaN 1112.5 4.578233 0.445
# 2020-12-09 09:48:38 0.114842 NaN NaN 912.5 4.554859 0.485
# Probe_Volume pH
# measurement_time
# 2020-12-09 09:48:38 200.0 NaN
# 2020-12-09 09:48:38 NaN NaN
Note: this is not a groupby aggregation, it can perform the operation on the whole dataframe at once