I have an Observable where each new value should cause an HTTP request. On the client-side I only care about the latest response value; however, I want every request to complete for monitoring/etc. purposes.
What I currently have is something like:
function simulate(x) {
// Simulate an HTTP request.
return of(x).pipe(delay(6));
}
source$.pipe(
someMapFunc(x => simulate(x)),
);
When I use switchMap
for the someMapFunc
, I get the right set of responses (only the latest). However, if the request is taking too long, it will get canceled.
When I use mergeMap
instead, I get the right set of requests (every request completes), but I get the wrong set of responses (every single one).
Is there a way to get the requests of mergeMap
with the responses of switchMap
? I know I can write this as a custom operator, but I'm wondering if I can build this out of existing/standard rxjs operators. To summarize what I'm thinking of:
- a version of
switchMap
that doesn't unsubscribe when it switches; - a version of
mergeMap
that only emits values from the latest inner Observable.
CodePudding user response:
I believe that you need a combination of concatMap()
and last()
.
concatMap
does not subscribe to the next observable until the previous completes. Using it you will ensure the order of requests execution. And as it follows from the description it doesn't cancel previous subscriptions and let them finish, unlike switchMap
.
last
emits the last value emitted from the source on completion. Using it you will ensure that only one (last) result will be passed to the result.
Your code will look like that:
source$.pipe(
concatMap(x => simulate(x)),
last()
);
CodePudding user response:
I'm not 100% sure if this is what you're after, and I haven't fully rested this, but I created a custom operator that might do something close to what you're after. Maybe you can tinker with it a bit more.
This is a mergeMap that filters out "old" values. Old values are emissions from sources that happen after a newer source starts to emit.
function orderedMergeMap<T, R>(project: (v:T) => Observable<R>): OperatorFunction<T, R> {
return s => defer(() => {
let recent = 0;
return s.pipe(
map((v, i) => ({order: i, payload: v})),
mergeMap(({order, payload}) => project(payload).pipe(
map(v => ({order, payload: v}))
)),
tap(({order}) => {
if(order > recent) recent = order;
}),
filter(({order}) => order < recent),
map(({payload}) => payload)
);
});
}