I have a list of servers urls and making sequential http requests to them in a loop. When the success response arrives from the current request I want to break the loop and not to call all other servers. Could someone advice me how this could be handled in Angular/RxJS? Something like:
getClientData() {
for(let server of this.httpsServersList) {
var myObservable = this.queryData(server)
.pipe(
map((response: any) => {
const data = (response || '').trim();
if(data && this.dataIsCorrect(data)) {
return data; // **here I want to break from the loop!**
}
})
);
return myObservable;
}
}
private queryData(url: string) {
return this.http.get(url, { responseType: 'text' });
}
CodePudding user response:
IMO it's better to avoid using a for
loop for subscribing to multiple observables. It might lead to multiple open subscriptions. Common function used for this case is RxJS forkJoin
. But given your specific condition, I'd suggest using RxJS from
function with concatMap
operator to iterator each element in order and takeWhile
operator with it's inclusive
argument set to true
(thanks @Chris) to stop based on a condition and to return the last value.
import { from } from 'rxjs';
import { concatMap, filter, map, takeWhile } from 'rxjs/operators';
getClientData(): Observable<any> {
return from(this.httpsServersList).pipe(
concatMap((server: string) => this.queryData(server)),
map((response: any) => (response || '').trim()),
filter((data: string) => !!data) // <-- ignore empty or undefined data (remove this statement if you need them)
takeWhile(((data: string) => // <-- close stream when data is valid and condition is true
!data || !this.dataIsCorrect(data)
), true)
);
}
Note: Try to tweak the condition inside the takeWhile
predicate to match your requirement.
Edit 1: add inclusive
argument in takeWhile
opeartor
CodePudding user response:
In angular we rely on RxJS operators for such complex calls If you want to to call all of them in parallel then once one of them is fulfilled or rejected to cancel the other calls you should use RxJS race learnrxjs.io/learn-rxjs/operators/combination/race Or without RxJS you could use Promise.race
However if you want to call them in parallel and wait until first fulfilled "not rejected" or all of them rejected this is the case for Promise.any Unfortunately no RxJS operator for it but on the follwoing article you could see how to implement this custom operator for Promise.any and an example for that operator https://tmair.dev/blog/2020/08/promise-any-for-observables/
CodePudding user response:
create a subject like this
responseArrived=new Subject();
and after pipe add takeuntil like this
var myObservable = this.queryData(server).pipe(takeUntil(responseArrived),map...
and in the line of code return data just call
responseArrived.next()
CodePudding user response:
You can't use race because it will call all URLs in parallel, but you can use switchMap with recursive implementation
import { of, Observable, throwError } from 'rxjs';
import { catchError, switchMap } from 'rxjs/operators'
function getClientData(urls: string[]) {
// check if remaining urls
if (!urls.length) throw throwError(new Error('all urls have a error')); ;
return queryData(urls[0]).pipe(
switchMap((response) => {
const data = (response || '').trim();
if(data && this.dataIsCorrect(data))
// if response is correct, return an observable with the data
// for that we use of() observable
return of(data)
// if response is not correct, we call one more time the function with the next url
return getClientData(urls.slice(1))
}),
catchError(() => getClientData(urls.slice(1)))
);
}
function queryData(url: string): Observable<unknown> {
return this.http.get(url, { responseType: 'text' });
}
CodePudding user response:
If your only condition is that you cancel requests once at least one response is received, can't just simply unsubscribe from the observable returned from the HttpClient
call?
getData() {
const subscriptions = [];
[
'https://reqres.in/api/products/1',
'https://reqres.in/api/products/2',
'https://reqres.in/api/products/3',
].forEach((url, i) => {
subscriptions[i] = this.getClientData(url).subscribe(() => {
// Unsubscribe
subscriptions.forEach((v, j) => {
if (j !== i) {
console.log('Unsubscribe from ', j);
v.unsubscribe();
}
});
});
});
}
private getClientData(url: string) {
return this.httpClient.get(url, { responseType: 'text' }).pipe(
map((response: any) => {
const data = (response || '').trim();
if (data && true) return data;
return null;
})
);
}