Home > Software design >  How to repeat an observable sequence until it's empty?
How to repeat an observable sequence until it's empty?

Time:02-28

I have an IObservable<int> sequence that emits a single item the first 9 times it is subscribed, and on further subscriptions it emits nothing and completes immediately:

int counter = 0;
IObservable<int> source = Observable.Defer(() =>
{
    if (  counter < 10)
        return Observable.Return(counter).Delay(TimeSpan.FromMilliseconds(100));
    else
        return Observable.Empty<int>();
});

Now I want to repeat this sequence until it is completed. So I used the Repeat operator:

source
    .Repeat()
    .Do(x => Console.WriteLine(x), () => Console.WriteLine("Completed"))
    .Wait();

The problem is that this query never completes. The Repeat keeps subscribing to the source sequence again and again for an eternity. Even worse, when the source has stopped producing elements, the query enters in a merciless tight loop of death that hijacks one core of the CPU (my quad-core machine reports continuous CPU utilization 25%). Here is the output of the above code:

1
2
3
4
5
6
7
8
9

What I want is a variant of the Repeat operator that stops repeating the source when the source has stopped producing elements. Searching through the built-in Rx operators I can see a RepeatWhen operator, but apparently this can be used only for starting faster the next repetition, not for stopping the repeating altogether:

// Repeatedly resubscribes to the source observable after a normal completion and
// when the observable returned by a handler produces an arbitrary item.
public static IObservable<TSource> RepeatWhen<TSource, TSignal>(
    this IObservable<TSource> source,
    Func<IObservable<object>, IObservable<TSignal>> handler);

I am not 100% sure though, because the description of the handler parameter is quite obscure, so I might be missing something:

The function that is called for each observer and takes an observable sequence objects. It should return an observable of arbitrary items that should signal that arbitrary item in response to receiving the completion signal from the source observable. If this observable signals a terminal event, the sequence is terminated with that signal instead.

My question is: how can I implement a RepeatUntilEmpty operator that repeats the source sequence until it's empty? Is it possible to implement it based on the aforementioned RepeatWhen operator? If not, should I go low level (Observable.Create) and reimplement the basic Repeat functionality from scratch? Or can I use the Materialize operator to my advantage, combining it somehow with the existing Repeat? I am out of ideas at the moment. I am willing to accept any kind of solution, either high or low lever.

public static IObservable<T> RepeatUntilEmpty<T>(this IObservable<T> source)
{
    // What to do?
}

Replacing the Repeat with the RepeatUntilEmpty in my original code, should have the effect of making the query complete immediately after emitting the 9 element.

CodePudding user response:

You can use indeed Materialize()/Dematerialize() to build your own sequence of notifications based on the received notifications from the Repeat() statement. The notification sequence will look like this:

1C 2C 3C 4C 5C 6C 7C 8C 9C C C C ...

So we look for two consecutive OnCompleted notifications. If we don't found one we still return the received OnNext notification, otherwise we return the OnCompleted notification. The code can look like this:

public static void Main(string[] args)
{
    int counter = 0;
    IObservable<int> source = Observable.Defer(() =>
    {
        Console.WriteLine($"counter is now: {counter}");
        if (counter > 20) {
            System.Environment.Exit(1);
        }
        if (  counter < 10)
            return Observable.Return(counter).Delay(TimeSpan.FromMilliseconds(100));
        else
            return Observable.Empty<int>();
    });

    source
        .RepeatUntilEmpty()
        .Subscribe(x => {

                System.Threading.Thread.Sleep(10);
                Console.WriteLine($"SUBSCRIBE: {x}");
            }, () => Console.WriteLine("SUBSCRIBE:Completed"));

    System.Threading.Thread.Sleep(10000);
    Console.WriteLine("Main thread terminated");
}

With the RepeatUntilEmpty() method as follow:

public static IObservable<T> RepeatUntilEmpty<T>(this IObservable<T> source)
{
    return source
        .Materialize()
        .Repeat()
        .StartWith((Notification<T>)null)
        .Buffer(2, 1)
        .Select(it => {
            Console.WriteLine($"Buffer content: {String.Join(",", it)}");
            if (it[1].Kind != System.Reactive.NotificationKind.OnCompleted) {
                return it[1];
            }
            // it[1] is OnCompleted, check the previous one
            if (it[0] != null && it[0].Kind != System.Reactive.NotificationKind.OnCompleted) {
                // not a consecutive OnCompleted, so we ignore this OnCompleted with a NULL marker
                return null;
            }

            // okay, we have two consecutive OnCompleted, stop this observable.
            return it[1];
        })
        .Where(it => it != null) // remove the NULL marker
        .Dematerialize();
}

This will generate the following output:

counter is now: 0
Buffer content: ,OnNext(1)
SUBSCRIBE: 1
Buffer content: OnNext(1),OnCompleted()
counter is now: 1
Buffer content: OnCompleted(),OnNext(2)
SUBSCRIBE: 2
Buffer content: OnNext(2),OnCompleted()
counter is now: 2
Buffer content: OnCompleted(),OnNext(3)
SUBSCRIBE: 3
Buffer content: OnNext(3),OnCompleted()
counter is now: 3
Buffer content: OnCompleted(),OnNext(4)
SUBSCRIBE: 4
Buffer content: OnNext(4),OnCompleted()
counter is now: 4
Buffer content: OnCompleted(),OnNext(5)
SUBSCRIBE: 5
Buffer content: OnNext(5),OnCompleted()
counter is now: 5
Buffer content: OnCompleted(),OnNext(6)
SUBSCRIBE: 6
Buffer content: OnNext(6),OnCompleted()
counter is now: 6
Buffer content: OnCompleted(),OnNext(7)
SUBSCRIBE: 7
Buffer content: OnNext(7),OnCompleted()
counter is now: 7
Buffer content: OnCompleted(),OnNext(8)
SUBSCRIBE: 8
Buffer content: OnNext(8),OnCompleted()
counter is now: 8
Buffer content: OnCompleted(),OnNext(9)
SUBSCRIBE: 9
Buffer content: OnNext(9),OnCompleted()
counter is now: 9
Buffer content: OnCompleted(),OnCompleted()
SUBSCRIBE:Completed
Main thread terminated

I have not tested how this code handles OnError() notifications, so you might want to check that. Also, I had issues that the source.Materialize().Repeat() part will read some more data from the original source even though it had decided later to stop the observable. Specially with the Do().Wait() statement I sometimes receive additional output like:

counter is now: 9
Buffer content: OnCompleted(),OnCompleted()
SUBSCRIBE: Completed
counter is now: 10
counter is now: 11
counter is now: 12
counter is now: 13
counter is now: 14

This might be an issue for you as well that the Repeat() part is still trying to read/concat empty observables.

  • Related