Home > Blockchain >  HTTP Polling with RxJs with initial call different from the following ones
HTTP Polling with RxJs with initial call different from the following ones

Time:10-12

I'm developing an Angular 14 application and I need to poll a REST endpoint at regular intervals until I receive a certain result.

The catch, however, is that the first request I make is slightly different from the following ones (it needs an extra parameter), even though the returned type is the same. To make things clearer, here are the signatures of the two methods I'm calling (they, in turn, call the Angular Http Client and both return an Observable<HttpResponse<Foo>>):

export class HttpService{
    public initialCall(): Observable<HttpResponse<Foo>>{
        //...
    }
    
    public pollingCall(token: string): Observable<HttpResponse<Foo>>{
        //...
    }
}

To call the service, I'm using RxJs (I'm a total beginner with this, it should be noted) and at the moment my implementation looks like this:

httpService.initialCall().subscribe(
    initialResult => {
        handleIncomingFoo(initialResult.body);
        timer(3000).pipe(
            switchMap(_ => httpService.pollingCall(initialResult.body.token)),
            takeWhile(p => p.body.isRunning, true),
            repeat()
        ).subscribe(
            pollResult => {
                handleIncomingFoo(pollingResult.body);
            },
            error => {
                //Handle polling call error
            }
        );
    },
    error => {
        //Handle initial call error
    }
);

Basically, I first make the initial call, subscribe to it and when I get the first result I setup polling using timer until a certain condition is met.

However, this implementation to me seems too verbose and has a couple of drawbacks (for example: I must put the code that handles the incoming object in the separate method handleIncomingFoo() because I need to call it on the initial result and on the next ones as well. Also, error handling happens in two different spots).

What I want to know is simply if there is a more clever / less clunky way to write this code, maybe by chaining/combining RxJs operators that I don't know about. It would be ideal to only have one place in the code where the HttpResponse<Foo>> objects are handled, regardless of whether they come from the initial call or the following ones.

PS: notice that pollingCall() depends on a parameter that comes from the result of initialCall(), that is why I can't just put an if(...) inside of switchMap and call one method or the other based on whether we're on the first tick or not.

CodePudding user response:

I think a more concise way would be to separate the two calls into single observables and then merge the output stream as following:

let init$ = httpService.initialCall().pipe(shareReplay());
let poll$ = timer(0, 3000).pipe(
  withLatestFrom(init$),
  switchMap(([ _, res ]) => httpService.pollingCall(res.body.token )),
  takeWhile(val => val.body.isRunning, true)
);

merge(init$, poll$).pipe(catchError(/*...*/)).subscribe(handleIncomingFoo);

Notes:

  • You can pass a second attribute to timer in order to emit multiple intervals. That way you don't need repeat
  • You should use catchError operator instead of passing multiple functions to subscribe
  • If you merge the streams you only have to call handleIncomingFoo once

CodePudding user response:

You can make the initial call, then return an observable inside your switchMap that emits the initial result, followed by emissions from the polling observable. To achive this, we can use concat to create a single observable from two sources and use of to fabricate an observable from a plain value:

httpService.initialCall().pipe(
    catchError(/* handle initial error */),
    switchMap(initialResult => concat(
        of(initialResult),
        timer(3000).pipe(
            switchMap(() => httpService.pollingCall(initialResult.body.token)),
            catchError(/* handle polling error */),
            takeWhile(p => p.body.isRunning, true),
            repeat()
        )
    ))
).subscribe(
    result => handleIncomingFoo(result.body)
);

The benefits here are:

  1. you can handle all emissions in the same place with no duplicated logic
  2. you can handle errors closer to the calling code

If you wanted to handle the errors in the same place you could do so like this, however the stream would end when any error is received. Handling them separately as above may give you a chance to keep the stream alive after a failure.

httpService.initialCall().pipe(
    switchMap(initialResult => concat(
        of(initialResult),
        timer(3000).pipe(
            switchMap(() => httpService.pollingCall(initialResult.body.token)),
            takeWhile(p => p.body.isRunning, true),
            repeat()
        )
    ))
).subscribe(
    result => handleIncomingFoo(result.body),
    error  => handleError()
);
  • Related