Home > OS >  Is there an rxjs operator which emits the latest value on subscribe and continues to emit new values
Is there an rxjs operator which emits the latest value on subscribe and continues to emit new values

Time:08-04

I need to build an rxjs operator which I'll call latest. Here's an example of a Karma test demonstrating usage:

const root = new Subject<string>();
const inner = new Subject<number>();
const obj = { 'foo': inner };
const sut = root.pipe(latest(value => obj[value])); // latest operator applied
root.next('foo');
inner.next(42);
const spy = jasmine.createSpy();
sut.subscribe(spy);
expect(spy).toHaveBeenCalledWith(42);

Here are the actual requirements:

  1. It is like BehaviorSubject: any future subscribers to sut will immediately be nexted with the most recently received valid value. Note that it must subscribe to the root observable when the operator is called and immediately begin caching values for a future catch-up subscriber, even before anyone has subscribed to sut yet.
  2. It is like switchMap: when the root observable emits a new value, that value is converted (via the input factory function) to a new observable (which I'll call the "inner observable") whose values are monitored for the output.
  3. It is like share: it needs to multicast and to continue to process new subscriptions even after an inner observable has errored or completed. In fact, when a new subscription occurs after the inner observable errors, this new subscriber needs to get the most recent valid value immediately as though no error had happened.
  4. It needs to emit no values to any subscriber until an inner observable has emitted a value.
  5. It needs to error and release all current subscribers when the inner observable errors.
  6. It needs to error/complete and release all current and future subscribers when the root observable errors/completes respectively.

The below is my code for this.

function latest<TIn, TOut>(factory: (input: TIn) => Observable<TOut>): OperatorFunction<TIn, TOut> {
  return (src: Observable<TIn>) => {
    const subscribers: Subscriber<TOut>[] = [];
    let hasValue = false;
    let latestValue: TOut = undefined;
    let outerSubscription: Subscription = null, innerSubscription: Subscription = null;
    let rootError: any, rootCompleted = false, rootErrorReceived = false;

    outerSubscription = src.subscribe((value: TIn) => {
      if (innerSubscription) {
        innerSubscription.unsubscribe();
      }
      innerSubscription = factory(value).subscribe((innerValue: TOut) => {
        hasValue = true;
        latestValue = innerValue;
        subscribers.forEach(s => s.next(latestValue));
      }, error => {
        innerSubscription = null;
        subscribers.splice(0, subscribers.length).forEach(s => s.error(error));
      }, () => {
        innerSubscription = null;
      });
    }, error => {
      rootErrorReceived = true, rootError = error;
      subscribers.splice(0, subscribers.length).forEach(s => s.error(error));
      if (innerSubscription) {
        innerSubscription.unsubscribe();
        innerSubscription = null;
      }
    }, () => {
      rootCompleted = true, latestValue = undefined;
      if (innerSubscription) {
        innerSubscription.unsubscribe();
        innerSubscription = null;
      }
      subscribers.splice(0, subscribers.length).forEach(s => s.complete());
    });

    return new Observable<TOut>((subscriber: Subscriber<TOut>) => {
      if (rootCompleted) {
        subscriber.complete();
        return undefined;
      }
      if (rootErrorReceived) {
        subscriber.error(rootError);
        return undefined;
      }
      subscribers.push(subscriber);
      if (hasValue) {
        subscriber.next(latestValue);
      }
      return () => {
        const i = subscribers.indexOf(subscriber);
        if (i !== -1) {
          subscribers.splice(i, 1);
        }
      };
    });
  };
}

It works correctly according to all my requirements, but there simply must be a more rxjs way of doing this. I've tried several combinations of share, switchMap, catchError, onErrorResumeNext, Subject, ConnectableObservable, and BehaviorSubject, but I can't figure out the right one, especially with the need to subscribe immediately and handle future subscribers after error.

I'm on rxjs 6.6.2, but I can (probably) update if need be.

CodePudding user response:

You can use a combination of switchMap and shareReplay like this

root.pipe(switchMap(value => obj[value]), shareReplay(1))

CodePudding user response:

What you are looking for is probably a combination or skipUntil() & share()

const heartbeat = timer(0, 1000);

const waiter = of('go').pipe(delay(5000));
    
const myHotObservable = heartbeat.pipe(skipUntil(waiter), share());

myHotObservable.subscribe(console.log);

setTimeout(() => {
  myHotObservable.subscribe((v) => console.log('2', v));
}, 5000);

Stackblitz

  • Related