Home > Blockchain >  Rxjs: resolve an array of observables using batches
Rxjs: resolve an array of observables using batches

Time:10-05

I have an array of observables. Lets say:

const responses$: Observable<Response>[] = [
    this.service.get(1), 
    this.service.get(2), 
    this.service.get(3), 
    this.service.get(4)
];

I want to solve them in batches of size, let's say, 2, meaning that I will just have 2 batches in the end. Also, this batch management has to be sequential: I do not want to send the requests 3 and 4 (second batch) until the requests 1 and 2 (first batch) have been resolved.

Also, I want to retrieve the result of all the batches together, meaning that even if I have N batches, I just want my subscribe to receive one single result of all of them.

I have tried everything and I do not find an Rxjs-exclusive way of doing it. My real problem is that I am dealing with 2000 concurrent requests to an API that returns files that does not allow for that many requests at the same time, so what I want to do is to use sequential batches so that the API doesn't get overwhelmed.

CodePudding user response:

You could do something like this:

const BATCH_SIZE = 2;

from(request).pipe(
  bufferCount(BATCH_SIZE), // group the request in batches
  concatMap((batch) => forkJoin(batch)), // execute each batch sequentially
  reduce((acc, batchResults) => [...acc, ...batchResults], []) // wait for the source to complete and join all the results in a single array
)

I leave up to you the error management if one of the requests fails.

Cheers

CodePudding user response:

I would suggest using the RxJS's concat function with toArray operator to achieve that, like the following:

const responses$: Observable<Response>[] = []; // Define your batches here

concat(...responses$)
  .pipe(toArray())
  .subscribe((res) => {
    // You will get the list responses once all the inner observables are completed.
    console.log(res);
  });

CodePudding user response:

You can first group the observables into batches:

const responses: Observable<string>[] = [
  // ...
];

const batchSize = 2;
const groupedResponses: Observable<string>[][] = [];
while(responses.length) {
  groupedResponses.push(responses.splice(0, batchSize));
}

And then merge the batches and concat them like this:

concat(
  ...groupedResponses.map((group) => merge(...group))
).pipe(
  toArray(),
  tap(console.log)
).subscribe();
  • Related