Home > Enterprise >  sequencing/queuing http request batches with rxjs
sequencing/queuing http request batches with rxjs

Time:05-12

I'm trying to figure out how to queue outgoing api request batches using RxJs.

My goal on this sample angular app is to simultaneously send out 3 "random-cat-facts" API requests and only once they resolve send out 2 additional "random-activity" requests:

import { Component } from "@angular/core";
import { HttpClient } from "@angular/common/http";
import { forkJoin, concat } from "rxjs";
import { combineAll } from "rxjs/operators";

const RANDOM_CAT_FACTS_API = "https://catfact.ninja/fact";
const RANDOM_ACTIVITY_API = "https://www.boredapi.com/api/activity";

@Component({
  selector: "app-root",
  templateUrl: "./app.component.html",
  styleUrls: ["./app.component.css"]
})
export class AppComponent {
  constructor(private http: HttpClient) {}

  fetchSequential() {
    const catFactsObservables = [
      this.http.get(RANDOM_CAT_FACTS_API),
      this.http.get(RANDOM_CAT_FACTS_API),
      this.http.get(RANDOM_CAT_FACTS_API)
    ];

    const activitiesObservables = [
      this.http.get(RANDOM_ACTIVITY_API),
      this.http.get(RANDOM_ACTIVITY_API)
    ];

    const sequencedObservables = [
      forkJoin(catFactsObservables),
      forkJoin(activitiesObservables)
    ];

    concat(sequencedObservables)
      .pipe(combineAll())
      .subscribe((p) => console.log({ p }));
  }
}

I'm clearly doing something wrong since currently all 5 requests are sent out simultaneously.

How can I use RxJs in order to send out the activitiesObservables requests only once the catFactsObservables requests have been resolved?

Additionally, is there a way to get the subscribe callback to trigger only once both batches have been resolved - thus containing the responses for all 5 requests and not firing a separate event for each batch?

Sandbox:

https://codesandbox.io/s/mystifying-gauss-cfcj0z?file=/src/app/app.component.ts:700-721

CodePudding user response:

what you have to do is to use concatAll operator instead of combineAll operator.

this way, the second observable won't emit before the first one did.

your code would look like this:

concat(sequencedObservables)
      .pipe(concatAll())
      .subscribe((p) => console.log({ p }));

CodePudding user response:

In the end I managed to solve this based on @gil's answer and this answer using a combination of concatAll and reduce.

    concat(sequencedObservables)
      .pipe(
        concatAll(),
        reduce((acc, res) => {
          acc.push(res);
          return acc;
        }, [])
      )
      .subscribe((p) => console.log(p));
  • Related