Home > Net >  Is there a Subject implementation in Rx.NET that functionally resembles BehaviorSubject but emits on
Is there a Subject implementation in Rx.NET that functionally resembles BehaviorSubject but emits on

Time:09-21

Is there a Subject implementation in Rx.NET that functionally resembles BehaviorSubject but emits the next value only if it has changed?

I'm rather new to Reactive Extensions and I can't seem to find anything like that, although this pattern feels like a natural replacement for INotifyPropertyChanged.

My naive implementation is to encapsulate BehaviorSubject<T> like below. Is there any disadvantages in this, compared to creating a composable observable with Observable.DistinctUntilChanged?

    public class DistinctSubject<T> : SubjectBase<T>
    {
        private readonly BehaviorSubject<T> _subject;

        public DistinctSubject(T initialValue) =>
            _subject = new BehaviorSubject<T>(initialValue);

        public T Value 
        { 
            get => _subject.Value;
            set => this.OnNext(value);
        }

        public override bool HasObservers => _subject.HasObservers;

        public override bool IsDisposed => _subject.IsDisposed;

        public override void Dispose() => _subject.Dispose(); 

        public override void OnCompleted() => _subject.OnCompleted();   

        public override void OnError(Exception error) => _subject.OnError(error);

        public override void OnNext(T value)
        {
            if (!EqualityComparer<T>.Default.Equals(value, _subject.Value))
            {
                _subject.OnNext(value);
            }
        }

        public override IDisposable Subscribe(IObserver<T> observer) =>
            _subject.Subscribe(observer);
    }

CodePudding user response:

After glancing a bit at the source code of the BehaviorSubject<T> class, it seems that your DistinctSubject<T> implementation will behave differently in case an OnError is followed by an OnNext:

var subject = new DistinctSubject<int>(2021);
subject.OnError(new ApplicationException());
subject.OnNext(2022); // throws ApplicationException

This will throw, while doing the same with the BehaviorSubject<T> will not throw (the OnNext is just ignored).

My suggestion is to use the DistinctUntilChanged operator in the implementation, like this:

public class DistinctSubject<T> : ISubject<T>, IDisposable
{
    private readonly BehaviorSubject<T> _subject;
    private readonly IObservable<T> _distinctUntilChanged;

    public DistinctSubject(T initialValue, IEqualityComparer<T> comparer = default)
    {
        _subject = new BehaviorSubject<T>(initialValue);
        _distinctUntilChanged = _subject.DistinctUntilChanged(
            comparer ?? EqualityComparer<T>.Default);
    }

    public T Value => _subject.Value;
    public void OnNext(T value) => _subject.OnNext(value);
    public void OnError(Exception error) => _subject.OnError(error);
    public void OnCompleted() => _subject.OnCompleted();

    public IDisposable Subscribe(IObserver<T> observer) =>
        _distinctUntilChanged.Subscribe(observer);

    public void Dispose() => _subject.Dispose();
}

If you are worried about the needless allocation of an object, then you are not already familiar with the spirit of Rx. This library is about features and ease of use, not about performance or efficiency!

  • Related