Home > Back-end >  Unexpected behavior using "Rx" in .NET
Unexpected behavior using "Rx" in .NET

Time:06-27

I observed a behavior I was not expecting. Here is some code to illustrate:

class ReactiveService
{
    private readonly ISubject<WorkPayload> observableWork
        = new Subject<WorkPayload>();

    void WorkBeingDone(WorkPayload workPayload)
    {
        this.observableWork.OnNext(workPayload);
    }

    IObservable<WorkPayload> WorkBeingDone() => this.observableWork;
}

class WorkService
{
    private readonly ReactiveService reactiveService;

    public WorkService(ReactiveService reactiveService)
        => this.reactiveService = reactiveService;

    Task DoWork()
    {
        this.reactiveService.WorkBeingDone(new WorkPayload());
        // ... lots of logic here ...
        return Task.CompletedTask;
    }
}

class WorkObservingService
{
    private readonly ReactiveService reactiveService;

    public WorkObservingService(ReactiveService reactiveService)
        => this.reactiveService = reactiveService;

    async Task DoSomethingWhenWorkHappens()
    {
        // Now here what I want to do is that on each work being observed,
        // some asynchronous operation is triggered, currently I'm doing it
        // like this, but I'm open to any other suggestion.
        this.reactiveService.WorkBeingDone().Select(
            w => Observable.FromAsync(async () =>
            {
                // Do the asynchronous stuff with w...
            })).Concat().Subscribe();
        // Prevent the method from returning or something keep
        // the subscription alive...
    }
}

What seems to happen is that when I try to do some asynchronous work in the Observable.FromAsync, and it throws, this gets raised all the way back to the WorkService and crashes the call to DoWork. But in truth, this thing should not care about what happens with the observers of observableWork. Is that expected? How do I prevent this from happening?

CodePudding user response:

The observable sequences are awaitable, so you could simply do this:

async Task DoSomethingWhenWorkHappens()
{
    await this.reactiveService.WorkBeingDone().Select(
        w => Observable.FromAsync(async () =>
        {
            // Do the asynchronous stuff with w...
        })).Concat().DefaultIfEmpty();
}

Any error propagated through the sequence will be passed to the resulting Task, completing it in a Faulted state.

The purpose of the DefaultIfEmpty operator at the end of the chain is to prevent an InvalidOperationException, in case the sequence emits zero elements. The await is designed to propagate the last element in the sequence as a result, and so it gets confused when the sequence doesn't contain any elements.

Another option is to elide the async and await, and just convert the sequence to a Task (.ToTask()), and return it directly.

What seems to happen is that when I try to do some asynchronous work in the Observable.FromAsync, and it throws, this gets raised all the way back to the WorkService and crashes the call to DoWork.

I haven't run your code, but my theory is that the error in the Observable.FromAsync is escalated to an unhandled exception that crashes the process. I don't expect that you can handle it in the DoWork, by wrapping it in try/catch. This happens because of the naked .Subscribe(), that is missing all three handlers (onNext, onError and onCompleted). By omitting especially the onError handler, you are saying to the Rx that "Hey! I don't know how to handle a possible error!". Guess what, the Rx doesn't know either how to handle this error, so it does what is arguably the most sensible and responsible thing, and rethrows it on the ThreadPool, causing your application to crash. Which is not very kind, but on the other hand it's probably better than suppressing the error and throwing it silently into the memory hole.

  • Related