Home > Mobile >  How to transform observable based on predicate involving first element
How to transform observable based on predicate involving first element

Time:11-04

I'm trying to create an Rx.NET operator that takes an Observable<string> and:

  • Forwards each element unchanged if the first element is "a"
  • Emits just a completion signal otherwise

For example:

-a-b-c-d-|- --> -a-b-c-d-|-

-b-c-d-|- --> -|-

How can I do this?

CodePudding user response:

Here is one way to do it:

/// <summary>
/// If the first element has the expected value, return the whole sequence.
/// Otherwise, return an empty sequence.
/// </summary>
public static IObservable<T> IfFirstElement<T>(this IObservable<T> source,
    T expectedFirstElement, IEqualityComparer<T> comparer = default)
{
    comparer ??= EqualityComparer<T>.Default;
    return source.Publish(published =>
        published
            .Where(x => !comparer.Equals(x, expectedFirstElement))
            .Take(1)
            .IgnoreElements()
            .Amb(published)
    );
}

This implementation uses the Amb operator (short for “ambiguous”), which takes two sequences and propagates the sequence that reacts first.

  1. If the first element has the desirable value, the first sequence (the published.Where Take IgnoreElements) does not react, so the second sequence is propagated (the published, which is the whole sequence). At this point the first sequence is unsubscribed, so the comparer.Equals method will not be invoked for subsequent elements.
  2. If the first element has not the desirable value, the first sequence emits a completion notification, which is propagated by the Amb operator, and the second sequence (the whole sequence) is ignored.

Usage example:

IObservable<string> original = new string[] { "a", "b", "c", "d" }.ToObservable();
IObservable<string> transformed = original.IfFirstElement("a");

Note: This implementation is based on the assumption that when both sequences react at the same time, the Amb operator selects consistently the first sequence. This is not mentioned in the documentation, which states only that "The Amb operator uses parallel processing to detect which sequence yields the first item". The source code is quite complex, so I can't derive this guarantee by reading it. If you want something more reliable, you could try this implementation instead:

return Observable.Create<T>(observer =>
{
    bool first = true;
    return source.Subscribe(item =>
    {
        if (first)
        {
            first = false;
            if (!comparer.Equals(item, expectedFirstElement))
            {
                observer.OnCompleted(); return;
            }
        }
        observer.OnNext(item);
    }, observer.OnError, observer.OnCompleted);
});
  • Related