Home > Blockchain >  How cancel a debounced Rx event A if a different event B has occured?
How cancel a debounced Rx event A if a different event B has occured?

Time:10-10

I need to cancel a debounced Rx event A if a different event B has occured. A contrived example: ignore a debounced keyboard keystroke if a mouse button was clicked meanwhile.

Below I simulate events A and B via timer delays. A is debounced using Rx.NET Throttle operator:

var subjA = new Subject<int>();
var subjB = new Subject<Unit>();

// desired output: 3 (because B occcurs at 150ms timeline)
// actual output: 2, 3

subjA
    .Throttle(TimeSpan.FromMilliseconds(200)).
    .Subscribe(s => Console.WriteLine(s)); 

await Task.WhenAll(EmitA(), EmitB(), Task.Delay(2000));

async Task EmitA()
{
    subjA!.OnNext(1);
    await Task.Delay(100);
    subjA!.OnNext(2);
    await Task.Delay(500);
    subjA!.OnNext(3);
}

async Task EmitB()
{
    await Task.Delay(150);
    subjB!.OnNext(Unit.Default);
}

I can solve this by giving up Throttle and using Select/Delay/TakeUntil/Switch, try the fiddle:

#nullable enable
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading.Tasks;
using System;

var subjA = new Subject<int>();
var subjB = new Subject<Unit>();

subjA.Select(n =>
        Observable.Return(n).Delay(TimeSpan.FromMilliseconds(200))
        .TakeUntil(subjB))
    .Switch()
    .Subscribe(s => Console.WriteLine(s));

await Task.WhenAll(EmitA(), EmitB(), Task.Delay(2000));

async Task EmitA()
{
    subjA!.OnNext(1);
    await Task.Delay(100);
    subjA!.OnNext(2);
    await Task.Delay(500);
    subjA!.OnNext(3);
}

async Task EmitB()
{
    await Task.Delay(150);
    subjB!.OnNext(Unit.Default);
}

Though, this feels like a complicated approach to what must be a common Rx scenario. Is there an elegant way of solving this?


Marble diagram:

subjA:   ---1---2---------------3---------|
subjB:   ----------U-------------------------------|
Result:  -----------------------------3---|

CodePudding user response:

I feel like this might be what you're looking for:

IObservable<int> query =
    Observable
        .Merge(
            subjA.Select(a => (int?)a),
            subjB.Select(b => (int?)null))
        .Throttle(TimeSpan.FromMilliseconds(200.0))
        .Where(x => x.HasValue)
        .Select(x => x.Value);
    
query
    .Subscribe(s => Console.WriteLine(s));

That's combining both sequences before the Throttle and then only emitting values that come from subjA.

CodePudding user response:

Here is a polished version of Enigmativity's idea to throttle the two sequences after merging them:

static IObservable<TSource> Debounce<TSource, TIgnoreSignal>(
    this IObservable<TSource> source,
    TimeSpan dueTime,
    IObservable<TIgnoreSignal> ignoreDebouncedItem,
    IScheduler scheduler = default)
{
    ArgumentNullException.ThrowIfNull(source);
    ArgumentNullException.ThrowIfNull(ignoreDebouncedItem);
    if (dueTime < TimeSpan.Zero)
        throw new ArgumentOutOfRangeException(nameof(dueTime));

    scheduler ??= DefaultScheduler.Instance;
    return source.Publish(published => published
        .Select(x => (x, true))
        .Merge(ignoreDebouncedItem
            .Select(_ => (default(TSource), false))
            .TakeUntil(published.LastOrDefaultAsync())
        )
        .Throttle(dueTime, scheduler)
        .Where(e => e.Item2)
        .Select(e => e.Item1));
}

Usage example:

IObservable<int> query = subjA.Debounce(TimeSpan.FromMilliseconds(200), subjB);

I haven't tested it, so it might have bugs.

  • Related