I have a service layer responsible for handling the data and sending it to components that use the same value.
I need to make requests to the api, as long as the value is isProcessed == false
it will do 10 attempts every half a second.
If it fails during the attempts, it will only make 3 more requests.
If isProcessed == true
, stopped the requisitions.
All the logic is built in, but I can't output the last observable value. All responses are sent.
All requests are sent by observables, both false and true, but I only need the last one.
Here all requests responses are arriving in the component, not just the last
Service responsible for accessing API:
public getPositionConsolidate(): Observable<PosicaoConsolidada> {
return this.http.get<PosicaoConsolidada>(`${environment.api.basePosicaoConsolidada}/consolidado`)
.pipe(
map(res => res),
retryWhen(genericRetryStrategy()),
shareReplay(1),
catchError(err => {
console.log('Error in Position Consolidate', err);
return throwError(err);
})
)
}
Service responsible for handling the data and sending it to the component:
public positionConsolidate() {
let subject = new BehaviorSubject<any>([]);
this.api.getPositionConsolidate().subscribe(response => {
if(response.hasProcessado == false) {
for (let numberRequest = 0; numberRequest < 10; numberRequest ) {
setTimeout(() => {
//subject.next(this.api.getPosicaoConsolidada().subscribe());
this.api.getPositionConsolidate().subscribe(res => {
subject.next(res)
})
}, numberRequest * 500, numberRequest);
}
} else {
retryWhen(genericRetryStrategy()),
finalize(() => this.loadingService.loadingOff())
}
})
return subject.asObservable()
}
In component:
public ngOnInit() {
this.coreState.positionConsolidate().subscribe(res => console.log(res))
}
CodePudding user response:
The easiest part to answer of your question, is that if you just want the last emission from an observable, then just use the last operator. However, the way you've written things, makes it difficult to incorperate. The following refactors your code as a single stream without any non-rxjs control structures.
public positionConsolidate() {
return this.api.getPositionConsolidate().pipe(
concatMap(res => iif(() => res.hasProcessado,
of(res),
interval(500).pipe(
take(10),
concatMap(() => this.api.getPositionConsolidate())
)
)),
retryWhen(genericRetryStrategy()),
finalize(() => this.loadingService.loadingOff()),
last()
);
}
What's happening
- First this executes the initial api call.
- Then based on the result, either...
- Returns the initial result
- Calls the api 10 times every 500ms.
- Implements what you have for retryWhen and finalize.
- Returns the last emitted result.
Also don't subscribe to observables inside of observables - that's what higher order observables such as concatMap are for.