Home > Software design >  Polars conditional merge of rows
Polars conditional merge of rows

Time:08-05

I have some time data, that I want to merge if two intervals are close enough (and the type is the same), currently I'm doing sth. like this:

Note: time_start, time_end are datetimes, type a categorical string

buffer = dt.timedelta(minutes=15)
_temp_df = (
    data.sort("time_start").select([
        pl.col("*"),
        ((pl.col("time_start").shift(-1)) - pl.col("time_end")).fill_null(0).apply(lambda x: (dt.timedelta(seconds=x // 1000))).alias("offset"),
        pl.col("type").shift(-1).alias("type_next")
    ]).select([
        pl.col("time_start"), pl.col("time_end"), pl.col("type"),
        ((pl.col("offset") < buffer) & (pl.col("type") == pl.col("type_next"))).alias("can_merge")
    ])
).to_dicts()

_after_df = []
if len(_temp_df) != 0:
    last_data = _temp_df[0]
    for i in range(1, len(_temp_df)):
        _dp = _temp_df[i]
        if last_data['can_merge']:
            last_data['time_end'] = _dp['time_end']
            last_data['can_merge'] = _dp['can_merge']
        else:
            _after_df.append(last_data)
            last_data = _dp

    _after_df.append(last_data)

data = pl.from_dicts(_after_df)

Whilst this works, it feels extremely off, that I have to convert polars -> dicts -> polars, is there a way to do this over the aggregations? The groupby_dynamic and grouby_rolling seem to only accepts times, but I want to merge consecutive rows by offset.

This is Polars: 0.13.59

CodePudding user response:

You didn't provide any data, so let's start with the following:

import datetime as dt
import polars as pl

data = (
    pl.DataFrame(
        {
            "time_start": [ "12:00", "12:20", "12:40", "13:10", "13:15", "13:50", "13:55", "14:50", "15:20", ],
            "time_end": [ "12:15", "12:30", "13:00", "13:20", "13:45", "14:00", "14:45", "15:00", "15:30", ],
            "type": ["a", "a", "a", "b", "b", "c", "c", "a", "a"],
        }
    )
).with_columns(
    [
        pl.col("type").cast(pl.Categorical),
        pl.format("2020-01-01T{}:00", "time_start")
        .str.strptime(pl.Datetime())
        .dt.cast_time_unit("ms")
        .alias("time_start"),
        pl.format("2020-01-01T{}:00", "time_end")
        .str.strptime(pl.Datetime())
        .dt.cast_time_unit("ms")
        .alias("time_end"),
    ]
)
data
shape: (9, 3)
┌─────────────────────┬─────────────────────┬──────┐
│ time_start          ┆ time_end            ┆ type │
│ ---                 ┆ ---                 ┆ ---  │
│ datetime[ms]        ┆ datetime[ms]        ┆ cat  │
╞═════════════════════╪═════════════════════╪══════╡
│ 2020-01-01 12:00:00 ┆ 2020-01-01 12:15:00 ┆ a    │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┤
│ 2020-01-01 12:20:00 ┆ 2020-01-01 12:30:00 ┆ a    │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┤
│ 2020-01-01 12:40:00 ┆ 2020-01-01 13:00:00 ┆ a    │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┤
│ 2020-01-01 13:10:00 ┆ 2020-01-01 13:20:00 ┆ b    │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┤
│ 2020-01-01 13:15:00 ┆ 2020-01-01 13:45:00 ┆ b    │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┤
│ 2020-01-01 13:50:00 ┆ 2020-01-01 14:00:00 ┆ c    │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┤
│ 2020-01-01 13:55:00 ┆ 2020-01-01 14:45:00 ┆ c    │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┤
│ 2020-01-01 14:50:00 ┆ 2020-01-01 15:00:00 ┆ a    │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┤
│ 2020-01-01 15:20:00 ┆ 2020-01-01 15:30:00 ┆ a    │
└─────────────────────┴─────────────────────┴──────┘

The Algorithm

Assumption: the following should work as long as no interval fully encloses another. The intervals can overlap and they can be disjoint. (This is why it's helpful if you provide sample data for your question -- it provides some insight as to the assumptions underlying the data.)

(
    data
    .sort("time_start")
    .with_columns([
        (
            (
                pl.col("time_end").dt.offset_by("15m") <
                pl.col("time_start").shift(-1)
            ) |
            (
                pl.col('type') != pl.col('type').shift(-1)
            )
        )
        .shift_and_fill(1, False)
        .cumsum()
        .alias("run_nbr"),
    ])
    .groupby('run_nbr')
    .agg([
        pl.col('time_start').min().alias('time_start'),
        pl.col('time_end').max().alias('time_end'),
        pl.col('type').first().alias('type'),
    ])
    .sort('time_start')
)
shape: (5, 4)
┌─────────┬─────────────────────┬─────────────────────┬──────┐
│ run_nbr ┆ time_start          ┆ time_end            ┆ type │
│ ---     ┆ ---                 ┆ ---                 ┆ ---  │
│ u32     ┆ datetime[ms]        ┆ datetime[ms]        ┆ cat  │
╞═════════╪═════════════════════╪═════════════════════╪══════╡
│ 0       ┆ 2020-01-01 12:00:00 ┆ 2020-01-01 13:00:00 ┆ a    │
├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┤
│ 1       ┆ 2020-01-01 13:10:00 ┆ 2020-01-01 13:45:00 ┆ b    │
├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┤
│ 2       ┆ 2020-01-01 13:50:00 ┆ 2020-01-01 14:45:00 ┆ c    │
├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┤
│ 3       ┆ 2020-01-01 14:50:00 ┆ 2020-01-01 15:00:00 ┆ a    │
├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┤
│ 4       ┆ 2020-01-01 15:20:00 ┆ 2020-01-01 15:30:00 ┆ a    │
└─────────┴─────────────────────┴─────────────────────┴──────┘

Your code gives the same result when you run it. (I left run_nbr in for the discussion below.) Note: your code converts type from Categorical to str, but the above algorithm maintains it as Categorical.

In Steps

The algorithm attempts to assign "run numbers" to the sorted intervals. A "run" of intervals is any successive set of rows that can be collapsed into a single row, based on gap between the intervals and type.

(
    data
    .sort("time_start")
    .with_columns([
        (
            (
                pl.col("time_end").dt.offset_by("15m") <
                pl.col("time_start").shift(-1)
            ) |
            (
                pl.col('type') != pl.col('type').shift(-1)
            )
        )
        .shift_and_fill(1, False)
        .cumsum()
        .alias("run_nbr"),
    ])
)


shape: (9, 4)
┌─────────────────────┬─────────────────────┬──────┬─────────┐
│ time_start          ┆ time_end            ┆ type ┆ run_nbr │
│ ---                 ┆ ---                 ┆ ---  ┆ ---     │
│ datetime[ms]        ┆ datetime[ms]        ┆ cat  ┆ u32     │
╞═════════════════════╪═════════════════════╪══════╪═════════╡
│ 2020-01-01 12:00:00 ┆ 2020-01-01 12:15:00 ┆ a    ┆ 0       │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤
│ 2020-01-01 12:20:00 ┆ 2020-01-01 12:30:00 ┆ a    ┆ 0       │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤
│ 2020-01-01 12:40:00 ┆ 2020-01-01 13:00:00 ┆ a    ┆ 0       │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤
│ 2020-01-01 13:10:00 ┆ 2020-01-01 13:20:00 ┆ b    ┆ 1       │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤
│ 2020-01-01 13:15:00 ┆ 2020-01-01 13:45:00 ┆ b    ┆ 1       │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤
│ 2020-01-01 13:50:00 ┆ 2020-01-01 14:00:00 ┆ c    ┆ 2       │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤
│ 2020-01-01 13:55:00 ┆ 2020-01-01 14:45:00 ┆ c    ┆ 2       │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤
│ 2020-01-01 14:50:00 ┆ 2020-01-01 15:00:00 ┆ a    ┆ 3       │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┤
│ 2020-01-01 15:20:00 ┆ 2020-01-01 15:30:00 ┆ a    ┆ 4       │
└─────────────────────┴─────────────────────┴──────┴─────────┘

From there, the groupby does the rest.

  • Related