Home > Back-end >  Observable's LastAsync() deadlocks in console app
Observable's LastAsync() deadlocks in console app

Time:10-30

When using IObservable.LastAsync() to force my console app to wait on the result of an API call using Flurl, that API call is never made and the main thread deadlocks and never returns from LastAsync(). My goals are:

  1. Since this is a console app, I can't really "subscribe" to the API call since that would allow the main thread to continue, likely causing it to exit prior to the API call completing. So I need to block until the value is obtained.
  2. The API call should be deferred until the first subscriber requests a value.
  3. Second and onward subscribers should not cause another API call, instead the last value from the stream should be returned (this is the goal of using Replay(1))

Here is an example that reproduces the issue:

public static class Program
{
    public static async Task Main(string[] args)
    {
        var obs = Observable.Defer(() =>
                "https://api.publicapis.org"
                    .AppendPathSegment("entries")
                    .GetJsonAsync()
                    .ToObservable())
            .Select(x => x.title)
            .Replay(1);

        var title = await obs.LastAsync();
        Console.WriteLine($"Title 1: {title}");
    }
}

How can I modify my example to ensure that all 3 requirements above are met? Why does my example cause a deadlock?

CodePudding user response:

Replay returns "connectable" observable, and you need to call Connect() method on it to start it going. Without that call, it does not subscribe to the underlying observable and does not emit items to its own subscribers, so that's why you have a "deadlock".

In this case instead of manually connecting, you can use RefCount() extension method which will automatically connect it on first subscriber and disconnect on when last subscriber unsubscribes. So:

public static async Task Main(string[] args) {
    var obs = Observable.Defer(() =>
            "https://api.publicapis.org"
                .AppendPathSegment("entries")
                .GetJsonAsync()
                .ToObservable())
        .Select(x => x.count)
        .Replay(1)
        .RefCount();

    // makes request
    var title = await obs.LastAsync();
    Console.WriteLine($"Title 1: {title}");
    // does not make request, obtains from replay cache
    title = await obs.LastAsync();
    Console.WriteLine($"Title 2: {title}");
}

You can also use AutoConnect method:

.Replay(1)
.AutoConnect(1);

This will automatically connect on first subscriber but will never disconnect (in your case shouldn't matter).

  • Related