Home > Software engineering >  Rx.Net Operator that ignores some values until they are identical during a certain amount of time
Rx.Net Operator that ignores some values until they are identical during a certain amount of time

Time:12-08

I'm trying to create a Rx.Net operator that will have the following behavior:

  • When an event is of "normal kind", return that event directly
  • When an event is of "special kind", wait that you have received repeated events of that kind during a certain amount of time

I would like to have something like the following marble.

When the message type is A or B, we send it through directly. When the message is C, we want to make sure that it's not just a transitive state and only send it if it was like that for a certain amount of time. This is measured by the time between the first C and the current C that we receive. After that specific amount of time, all C is "accepted" and we pass them through as normal ones.

Marble diagram with non ignored

Here's what it looks when we have received C that was just transitive and we want to ignore it.

Marbe diagram with ignored

I've tried to do something with the Scan operator, playing with returning the previous/current values when I have a specific value, but it feels really hacky.

Here's some code that I wrote for a demonstration of what I tried. In that case, the "special kind" is just when the value is 999, but in the operator, I'd like to do it could be another test or even a function passed to my operator.

var oneObservable = Observable.Interval(TimeSpan.FromSeconds(1)).Select(_ => 999);
var intObservable = Observable.Interval(TimeSpan.FromSeconds(1)).Select(value => (int)value);

var myObservable = intObservable.Take(4).Concat(oneObservable.Take(3)).Timestamp().Repeat();

var test = myObservable.Scan(
    (previous: default(Timestamped<int>), current: default(Timestamped<int>)),
    (accumulated, current) =>
    {
        if (current.Value == 999)
        {
            if (accumulated.previous.Value != 999)
            {
                return (accumulated.current, current);
            }
            
            return (accumulated.previous, current);
        }
        else if(accumulated.current.Value == 999){
            return (accumulated.previous, current);
        }

        return (accumulated.current, current);
    })
    .Where(
        value => value.current.Value != 999 
        || (value.previous.Value == 999 && value.current.Timestamp - value.previous.Timestamp > TimeSpan.FromSeconds(1.5)))
    .Select(value => value.current);

CodePudding user response:

Here is a simple implementation of a custom IgnoreNonEstablishedContiguousValue operator, with the desirable functionality:

/// <summary>
/// Ignores elements having a specific value, until this value has
/// been repeated contiguously for a specific duration.
/// </summary>
public static IObservable<T> IgnoreNonEstablishedContiguousValue<T>(
    this IObservable<T> source,
    T value,
    TimeSpan dueTimeUntilEstablished,
    IEqualityComparer<T> comparer = default,
    IScheduler scheduler = default)
{
    comparer ??= EqualityComparer<T>.Default;
    scheduler ??= Scheduler.Default;
    return Observable.Defer(() =>
    {
        IStopwatch stopwatch = null;
        return source.Do(item =>
        {
            if (comparer.Equals(item, value))
                stopwatch ??= scheduler.StartStopwatch();
            else
                stopwatch = null;
        })
        .Where(_ => stopwatch == null || stopwatch.Elapsed >= dueTimeUntilEstablished);
    });
}

This implementation is based on the Do and Where operators. I'm not a big fan of using the Scan operator as a building block, because it results in verbose and less readable code IMHO. The purpose of the Observable.Defer wrapper is to isolate the state of each subscription.

Usage example:

var oneObservable = Observable.Interval(TimeSpan.FromSeconds(1)).Select(_ => 999);
var intObservable = Observable.Interval(TimeSpan.FromSeconds(1)).Select(v => (int)v);

IObservable<int> myObservable = intObservable.Take(4).Concat(oneObservable.Take(3))
    .Repeat()
    .IgnoreNonEstablishedContiguousValue(999, TimeSpan.FromSeconds(1.5));

CodePudding user response:

Here's what I came up with:

public static IObservable<T> FilterSpecials<T>(this IObservable<T> source,
    Func<T, bool> specialDetector,
    TimeSpan timeUntilEstablished,
    IScheduler scheduler = default)
{
    return source.FilterSpecials(specialDetector, Observable.Timer(timeUntilEstablished), scheduler);
}

public static IObservable<T> FilterSpecials<T, U>(this IObservable<T> source,
    Func<T, bool> specialDetector,
    IObservable<U> observeSpecialsUntilEstablished,
    IScheduler scheduler = default)
{
    scheduler = scheduler ?? Scheduler.Default;

    return source
        .Select(i => (value: i, isSpecial: specialDetector(i)))
        .StartWith((value: default(T), isSpecial: false))
        .Publish(_source => _source
            .Zip(_source.Skip(1))
            .Select((t, index) => (
                newValue: t.Second.value,
                isNewValueSpecial: t.Second.isSpecial,
                isPreviousValueSpecial: t.First.isSpecial,
                isFirstElement: index == 0)
            )
            .SelectMany((t, index) => t.isNewValueSpecial
                ? (t.isFirstElement || !t.isPreviousValueSpecial)       
                    ? _source.SkipUntil(observeSpecialsUntilEstablished).TakeWhile(i => i.isSpecial).Select(i => i.value)   
                    : Observable.Empty<T>()
                : Observable.Return(t.newValue)
        ));
}

At core, the issue is that for the first special value, you want to temporarily stop listening to your regular source observable, and switch to something that looks like this:

source.SkipUntil(observeSpecialsUntilEstablished).TakeWhile(i => i.isSpecial).Select(i => i.value)

When you have that special observable listening, you can ignore your regular one and emit Observable.Empty<T>(). When you don't have special values, you're effectively doing source.SelectMany(i => Observable.Return(i)), which is a no-op to return source.

Everything else is window-dressing: The Zip, Publish and StartWith are there to make it easy to compare to previous value. That could be abstracted away if you want. Putting everything in that named tuple is done to help with self-documentation and to prevent re-calling the specialDetector in case it's an expensive operation.

  • Related