Home > database >  switchMap combined with mergeMap
switchMap combined with mergeMap

Time:11-23

I have an Observable where each new value should cause an HTTP request. On the client-side I only care about the latest response value; however, I want every request to complete for monitoring/etc. purposes.

What I currently have is something like:

function simulate(x) {
  // Simulate an HTTP request.
  return of(x).pipe(delay(6));
}

source$.pipe(
  someMapFunc(x => simulate(x)),
);

When I use switchMap for the someMapFunc, I get the right set of responses (only the latest). However, if the request is taking too long, it will get canceled.

When I use mergeMap instead, I get the right set of requests (every request completes), but I get the wrong set of responses (every single one).

marble diagram of code above

Is there a way to get the requests of mergeMap with the responses of switchMap? I know I can write this as a custom operator, but I'm wondering if I can build this out of existing/standard rxjs operators. To summarize what I'm thinking of:

  • a version of switchMap that doesn't unsubscribe when it switches;
  • a version of mergeMap that only emits values from the latest inner Observable.

CodePudding user response:

I believe that you need a combination of concatMap() and last().

concatMap does not subscribe to the next observable until the previous completes. Using it you will ensure the order of requests execution. And as it follows from the description it doesn't cancel previous subscriptions and let them finish, unlike switchMap.

last emits the last value emitted from the source on completion. Using it you will ensure that only one (last) result will be passed to the result.

Your code will look like that:

source$.pipe(
  concatMap(x => simulate(x)),
  last()
);

CodePudding user response:

I'm not 100% sure if this is what you're after, and I haven't fully rested this, but I created a custom operator that might do something close to what you're after. Maybe you can tinker with it a bit more.

This is a mergeMap that filters out "old" values. Old values are emissions from sources that happen after a newer source starts to emit.

function orderedMergeMap<T, R>(project: (v:T) => Observable<R>): OperatorFunction<T, R> {
  return s => defer(() => {
    let recent = 0;
    return s.pipe(
      map((v, i) => ({order: i, payload: v})),
      mergeMap(({order, payload}) => project(payload).pipe(
        map(v => ({order, payload: v}))
      )),
      tap(({order}) => {
        if(order > recent) recent = order;
      }),
      filter(({order}) => order < recent),
      map(({payload}) => payload)
    );
  });
}
  • Related