I'm trying to create an Rx.NET operator that takes an Observable<string>
and:
- Forwards each element unchanged if the first element is
"a"
- Emits just a completion signal otherwise
For example:
-a-b-c-d-|- --> -a-b-c-d-|-
-b-c-d-|- --> -|-
How can I do this?
CodePudding user response:
Here is one way to do it:
/// <summary>
/// If the first element has the expected value, return the whole sequence.
/// Otherwise, return an empty sequence.
/// </summary>
public static IObservable<T> IfFirstElement<T>(this IObservable<T> source,
T expectedFirstElement, IEqualityComparer<T> comparer = default)
{
comparer ??= EqualityComparer<T>.Default;
return source.Publish(published =>
published
.Where(x => !comparer.Equals(x, expectedFirstElement))
.Take(1)
.IgnoreElements()
.Amb(published)
);
}
This implementation uses the Amb
operator (short for “ambiguous”), which takes two sequences and propagates the sequence that reacts first.
- If the first element has the desirable value, the first sequence (the
published.Where
Take
IgnoreElements
) does not react, so the second sequence is propagated (thepublished
, which is the whole sequence). At this point the first sequence is unsubscribed, so thecomparer.Equals
method will not be invoked for subsequent elements. - If the first element has not the desirable value, the first sequence emits a completion notification, which is propagated by the
Amb
operator, and the second sequence (the whole sequence) is ignored.
Usage example:
IObservable<string> original = new string[] { "a", "b", "c", "d" }.ToObservable();
IObservable<string> transformed = original.IfFirstElement("a");
Note: This implementation is based on the assumption that when both sequences react at the same time, the Amb
operator selects consistently the first sequence. This is not mentioned in the documentation, which states only that "The Amb
operator uses parallel processing to detect which sequence yields the first item". The source code is quite complex, so I can't derive this guarantee by reading it. If you want something more reliable, you could try this implementation instead:
return Observable.Create<T>(observer =>
{
bool first = true;
return source.Subscribe(item =>
{
if (first)
{
first = false;
if (!comparer.Equals(item, expectedFirstElement))
{
observer.OnCompleted(); return;
}
}
observer.OnNext(item);
}, observer.OnError, observer.OnCompleted);
});