Home > Software design >  RxJS operator that waits for completion and then emits one value
RxJS operator that waits for completion and then emits one value

Time:12-09

Is there an RxJS operator that waits until the source completes and then emits a given value? If there is none, how could I provide it on my own?

This would work similar to toArray() which also waits for the source to complete. Instead of collecting all the emitted values, I want to ignore them and return a different value instead.

Here is an equivalent implementation:

observable.pipe(
  ignoreElements(),
  endWith(myValue),
);

alternatively:

observable.pipe(
  toArray(),
  map(ignore => myValue)
)

There are often situations where I need this. I came to the conclusion that it's dangerous to convert promise then-chains to observables by switchMap() or mergeMap() because inner observables could complete without emitting a value at all. Recently we had this problem:

return getEntitiesFromBackend().pipe(
  switchMap(entities => {
    return putEntitiesToObjectStore(entities);
  }),
  switchMap(() => {
    return storeSyncDate();
  })
);

In some situations the sync date wasn't stored and it was hard to find out why. In the end, the reason was that the putEntities... method emits a value for its "put"-operation. But in those cases the entities array was empty, so no value was emitted at all.

This is what I actually wanted to do - translated to the promise world:

return getEntitiesFromBackend()
  .then(entities => {
    return putEntitiesToObjectStore(entities);
  })
  .then(() => {
    return storeSyncDate();
  })
);

Most of the code I saw which uses switchMap / mergeMap doesn't have this problem. Because most of the time you deal with HTTP requests that only emit once and then completes. See here for example. This made me getting used to convert typical promise patterns to the RxJS world with switchMap without thinking too much about it's actual working and purpose. Now, that we work with IndexedDB most of our methods return observables that emit a value for each DB operation. switchMap / mergeMap will get a nightmare here.

This is why I'm asking for such an operator and wonder why I couldn't find it yet, as it's such a common case in our application. I could easily solve this by using the alternative implementations above, but don't want to repeat those two operators over and over again:

return getEntitiesFromBackend().pipe(
  switchMap(entities => {
    return putEntitiesToObjectStore(entities);
  }),
  ignoreElements(),
  endWith(),
  switchMap(() => {
    return storeSyncDate();
  })
);

Of course I could use toArray() and just ignore the argument in the next operator. I don't like it because it would cause an unnecessary overhead.

CodePudding user response:

I'd suggest two changes.

  1. Use concatMap instead of switchMap or mergeMap. concatMap would ensure each emission from the getEntitiesFromBackend() Observable would be forwarded sequentially one after other and not in parellel (mergeMap) or would get cancelled (switchMap). See here for a brief intro to different types of higher order mapping operators.

  2. Instead of combination of operators like ignoreElements map you could use the last operator with a predicate that always returns false with a default value. This way the default value would be emitted when the source observable completes.

return getEntitiesFromBackend().pipe(
  concatMap(entities => {
    return putEntitiesToObjectStore(entities).pipe(
      last(() => false, myValue),
    );
  }),
  concatMap((myValue) => {
    return storeSyncDate();
  })
);

CodePudding user response:

it sounds to me like you want to achieve this:

source.pipe(
   last(), // emits only last value, when the source completes
   map(() => myValue), // or mapTo(myValue)
)

CodePudding user response:

How about using the finalize operator?

https://rxjs.dev/api/operators/finalize

CodePudding user response:

This is my try, but I don't know if it works correctly (i.e. treats errors correctly etc.)

function emitOnComplete(value) {
  return function<T>(source: Observable<T>): Observable<T> {
    return new Observable(subscriber => {
      return source.subscribe({
        next() {},
        complete() {
          subscriber.next(value);
          subscriber.complete();
        }
      });
    });
  };
}

Alternatively I came up with this, but it's untested:

function emitOnComplete(value) {
  return function <T>(source: Observable<T>): Observable<T> {
    return source.pipe(
      ignoreElements(),
      // concat() is used to make sure we emit the value AFTER source completed
      // The tick is: source will never emit any value, because we use ignoreElements() above.
      // So the user will only get `of(value)`, but after the source is completed
      () => concat(source, of(value)),
    );
  }
}

CodePudding user response:

A few options come to mind.

defaultIfEmpty

This isn't exactly what you're describing, but it may fit your usecase.

defaultIfEmpty: Emits a given value if the source Observable completes without emitting any next value, otherwise mirrors the source Observable.

return getEntitiesFromBackend().pipe(
  switchMap(entities => putEntitiesToObjectStore(entities)),
  defualtIfEmpty(null),
  switchMap(_ => storeSyncDate())
);

Create your own operator

There is a static version of pipe that performs composition without the application to any given stream.

So, for example:

const a = source.pipe(
  ignoreElements(),
  endWith(myValue)
);

const b = pipe(
  ignoreElements(),
  endWith(myValue)
);

Here, a is an observable. Pipe composes two operators, then returns the result of applying the composed operator to the source.

b on the other hand is just the first step. b is an operator itself. The operator hasn't get been applied to an observable.

So you can do something like this:

source1.pipe(b);
source2.pipe(b);

I've re-used my b operator twice. We're most of the way there!

RxJS operators are so useful because they're returned by functions that customize the operator to your needs. In the above case, every time you use b, you'll have the same value for endwith.

We can wrap b in a function to get that customized on a per-use basis.

const myCustomOperator = myValue => pipe(
  ignoreElements(),
  endWith(myValue)
);

source1.pipe(
  myCustomOperator(22)
);

source2.pipe(
  myCustomOperator(23)
);

This works just like any other operator, so it can be composed (be "piped") with all the other standard operators too.

I could easily solve this by using the alternative implementations above, but don't want to repeat those two operators over and over again:

Now you have a re-usable piece of code!


How I would implement this:

JavaScript:

function ignoreConcatWith(genObs) {
  return pipe(
    ignoreElements(),
    concatWith(from(genObs()))
  );
}

[...]
  return getEntitiesFromBackend().pipe(
    switchMap(entities => putEntitiesToObjectStore(entities)),
    ignoreConcatWith(() => storeSyncDate())
  );
[...]

I find that seeing code statically typed often really helps my understanding. So here's the same thing in TypeScript:

function ignoreConcatWith<T,R>(genObs: () => ObservableInput<R>): OperatorFunction<T,R> {
  return pipe(
    ignoreElements(),
    concatWith(from(genObs()))
  );
}

[...]
  return getEntitiesFromBackend().pipe(
    switchMap(entities => putEntitiesToObjectStore(entities)),
    ignoreConcatWith(() => storeSyncDate())
  );
[...]
  • Related