I have an array of observables. Lets say:
const responses$: Observable<Response>[] = [
this.service.get(1),
this.service.get(2),
this.service.get(3),
this.service.get(4)
];
I want to solve them in batches of size, let's say, 2, meaning that I will just have 2 batches in the end. Also, this batch management has to be sequential: I do not want to send the requests 3 and 4 (second batch) until the requests 1 and 2 (first batch) have been resolved.
Also, I want to retrieve the result of all the batches together, meaning that even if I have N batches, I just want my subscribe to receive one single result of all of them.
I have tried everything and I do not find an Rxjs-exclusive way of doing it. My real problem is that I am dealing with 2000 concurrent requests to an API that returns files that does not allow for that many requests at the same time, so what I want to do is to use sequential batches so that the API doesn't get overwhelmed.
CodePudding user response:
You could do something like this:
const BATCH_SIZE = 2;
from(request).pipe(
bufferCount(BATCH_SIZE), // group the request in batches
concatMap((batch) => forkJoin(batch)), // execute each batch sequentially
reduce((acc, batchResults) => [...acc, ...batchResults], []) // wait for the source to complete and join all the results in a single array
)
I leave up to you the error management if one of the requests fails.
Cheers
CodePudding user response:
I would suggest using the RxJS's concat function with toArray operator to achieve that, like the following:
const responses$: Observable<Response>[] = []; // Define your batches here
concat(...responses$)
.pipe(toArray())
.subscribe((res) => {
// You will get the list responses once all the inner observables are completed.
console.log(res);
});
CodePudding user response:
You can first group the observables into batches:
const responses: Observable<string>[] = [
// ...
];
const batchSize = 2;
const groupedResponses: Observable<string>[][] = [];
while(responses.length) {
groupedResponses.push(responses.splice(0, batchSize));
}
And then merge the batches and concat them like this:
concat(
...groupedResponses.map((group) => merge(...group))
).pipe(
toArray(),
tap(console.log)
).subscribe();