Home > Mobile >  RxJs expand and delay operator not working as expected
RxJs expand and delay operator not working as expected

Time:10-27

I'm trying to use RxJs expand operator together with delay in order to reproduce a polling behavior to the server until the HTTP call returns success.

I use the delay operator as well because the HTTP call returns the following object:

{
    retryAfterInSeconds: 30,
    status: 'Running'
}

and I want to skip unnecessary calls if I know when I should check the status again.

So, I implemented the behavior as follows:

private pollExportStatus(entity: any): void {
    let waitingTime = 0;

    this.service
        .getStatus(entity)
        .pipe(
            untilDestroyed(this),
            expand((x: Status) => {
              waitingTime  = x.retryAfterInSeconds;

              const shouldContinuePolling =
                waitingTime < 300 &&
                x.status !== StatusType.Succeeded &&
                x.status !== StatusType.Failed;

              if (!shouldContinuePolling) {
                return EMPTY;
              }

              return this.service
                 .getStatus(entity)
                 .pipe(delay(x.retryAfterInSeconds * 1000));
        }),
        last()
        )
        .subscribe((x: Status) => {
            if (waitingTime >= 300 || x.status !== StatusType.Succeeded
            ) {
              console.log('Failed');

              return;
            }

            console.log('Succeeded');
        });
}

Everything works as expected except one thing: even if I return the next call (inside expand) with a delay:

return this.service
  .getStatus(entity)
  .pipe(delay(x.retryAfterInSeconds * 1000));

the first call is not delayed and I don't understand why.

Finally, I found a solution that works as expected.

I replaced the expand return with the following one:

return timer(x.retryAfterInSeconds * 1000).pipe(
     concatMap(() => this.service.getStatus(entity))
);

but I want to understand why the first one is not working and the second one does.

Note: the first behavior is working if I use a simple observable (created with of operator), but not with an HTTP call

CodePudding user response:

It seems like you are confused why the http request inside expand immediately fires instead of waiting for a second. The delay operator does delay the continuation of a pipe. But as soon as someone subscribes to an Observable, its execution will start from the top.

In this case, the subscription happens with the .subscribe() callback. The initial this.service.getStatus() call takes place and its result is passed to the expand. The execution continues to the next this.service.getStatus(entity) which will immediately fire the http request. Afterwards, there is a delay, which delays passing on the result of getStatus back to expand. The delay is essentially too far down the chain and happens too late.

The timer example works, because the timer only emits after waiting a second. Only then does the getStatus call happen.

CodePudding user response:

Tobias provided a way of thinking from source emissions passing down through operators. 90% of the time it’s how I think too but it’s important to know what actually happens in terms of subscriptions and the creation of observables which go bottom up.

Let’s follow the stream bottom up.

You subscribe. But what are you subscribing to? The last piped operator last(). Internally this subscribes to a source observable and when that completes emits the last value from this source.

So what does last() subscribe to? I.e. what’s its source observable? The observable created by expand(…).

Expand does a complicated dance but it’s essentially a recursive merge map where it emits the source emission plus any emissions from the observable resulting from the function provided applied to each emission. The key thing is that its source observable* whose first emission triggers the recursion is this.service.getStatus(entity), and this has no delay. The second emission will be from this.service.getStatus(entity).pipe(delay(30*1000)). All delay does is delay emissions. The delay pipe subscribes internally to this.service.getStatus(entity) immediately. Thus you’ve just hit the server twice in a row.

* unaffected by untilDestroyed(this) which just subscribes to source and completes when this destroyed.

One point here is that untilDestroyed(this) should be last. If this is at the top and there are delays in the middle then the stream may continue doing stuff after this is destroyed.

If you replace the first http with

of({
    retryAfterInSeconds: 30,
    status: 'Running'
})

then the first http request will be the one inside the expand with a delay.

  • Related