I need to cancel a debounced Rx event A
if a different event B
has occured. A
contrived example: ignore a debounced keyboard keystroke if a mouse button was clicked meanwhile.
Below I simulate events A
and B
via timer delays. A
is debounced using Rx.NET Throttle
operator:
var subjA = new Subject<int>();
var subjB = new Subject<Unit>();
// desired output: 3 (because B occcurs at 150ms timeline)
// actual output: 2, 3
subjA
.Throttle(TimeSpan.FromMilliseconds(200)).
.Subscribe(s => Console.WriteLine(s));
await Task.WhenAll(EmitA(), EmitB(), Task.Delay(2000));
async Task EmitA()
{
subjA!.OnNext(1);
await Task.Delay(100);
subjA!.OnNext(2);
await Task.Delay(500);
subjA!.OnNext(3);
}
async Task EmitB()
{
await Task.Delay(150);
subjB!.OnNext(Unit.Default);
}
I can solve this by giving up Throttle
and using Select
/Delay
/TakeUntil
/Switch
, try the fiddle:
#nullable enable
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading.Tasks;
using System;
var subjA = new Subject<int>();
var subjB = new Subject<Unit>();
subjA.Select(n =>
Observable.Return(n).Delay(TimeSpan.FromMilliseconds(200))
.TakeUntil(subjB))
.Switch()
.Subscribe(s => Console.WriteLine(s));
await Task.WhenAll(EmitA(), EmitB(), Task.Delay(2000));
async Task EmitA()
{
subjA!.OnNext(1);
await Task.Delay(100);
subjA!.OnNext(2);
await Task.Delay(500);
subjA!.OnNext(3);
}
async Task EmitB()
{
await Task.Delay(150);
subjB!.OnNext(Unit.Default);
}
Though, this feels like a complicated approach to what must be a common Rx scenario. Is there an elegant way of solving this?
Marble diagram:
subjA: ---1---2---------------3---------|
subjB: ----------U-------------------------------|
Result: -----------------------------3---|
CodePudding user response:
I feel like this might be what you're looking for:
IObservable<int> query =
Observable
.Merge(
subjA.Select(a => (int?)a),
subjB.Select(b => (int?)null))
.Throttle(TimeSpan.FromMilliseconds(200.0))
.Where(x => x.HasValue)
.Select(x => x.Value);
query
.Subscribe(s => Console.WriteLine(s));
That's combining both sequences before the Throttle
and then only emitting values that come from subjA
.
CodePudding user response:
Here is a polished version of Enigmativity's idea to throttle the two sequences after merging them:
static IObservable<TSource> Debounce<TSource, TIgnoreSignal>(
this IObservable<TSource> source,
TimeSpan dueTime,
IObservable<TIgnoreSignal> ignoreDebouncedItem,
IScheduler scheduler = default)
{
ArgumentNullException.ThrowIfNull(source);
ArgumentNullException.ThrowIfNull(ignoreDebouncedItem);
if (dueTime < TimeSpan.Zero)
throw new ArgumentOutOfRangeException(nameof(dueTime));
scheduler ??= DefaultScheduler.Instance;
return source.Publish(published => published
.Select(x => (x, true))
.Merge(ignoreDebouncedItem
.Select(_ => (default(TSource), false))
.TakeUntil(published.LastOrDefaultAsync())
)
.Throttle(dueTime, scheduler)
.Where(e => e.Item2)
.Select(e => e.Item1));
}
Usage example:
IObservable<int> query = subjA.Debounce(TimeSpan.FromMilliseconds(200), subjB);
I haven't tested it, so it might have bugs.